"""Utilities Module. With several functions that are repeatedly used"""
import os
import socket
import spidev
try:
import RPi.GPIO as GPIO
except RuntimeError:
print("Error importing RPi.GPIO! Use sudo / run sudo usermod -aG gpio <myusername> to get permission")
[docs]def discover_i2c():
"""
Scans address space and ports to discover connected I2C or SPI devices
uses os i2cdetect for the 1 I2C port and also scans two SPI ports
"""
print("Scan for connected I2C devices' addresses:")
print(os.popen("i2cdetect -y 1").read())
print("Checking if SPI Module is loaded:")
print(os.popen("lsmod | grep spi").read())
[docs]def read_analogue(channel, spi_device=0, baud=1350000):
"""
Reads an analogue signal from the connected SPI ADC device and returns channel reading.
:param channel: ADC channel where sensor is connected.
:type channel: int
:param spi_device: Either 0 or 1 as there are only 2 spi ports
:type spi_device: int
:param baud: the bit rate, measured in bit/s clock rate used for device
:type baud: int
:return: Raw 1024 bit ADC output data.
"""
spi = spidev.SpiDev()
spi.open(0, spi_device)
spi.max_speed_hz = baud # spi clock speed
_, byte1, byte2 = spi.xfer2([1, (8+channel) << 4, 0])
raw_data = ((byte1 & 3) << 8) + byte2
spi.close()
return raw_data
[docs]def switch_actuator(gpio_pin, state):
"""
Function to switch actuator ON or OFF
:param gpio_pin: The pin the fan relay (motor in demo) is connected to.
:type gpio_pin: int
:param state: Boolean indicating whether fan is on or off.
:type state: boolean
"""
GPIO.setup(gpio_pin, GPIO.OUT) # repetitive, will need to be done once.
GPIO.output(gpio_pin, state)
[docs]def scan_network():
"""
Ger the IP address other than the loopback IP that the device has been allocated by DHCP
Scan subnet /24 of IP address to check for LAN brokers' availability.
:return: list of online devices responding to ICMP echo request using ping.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except:
ip = '127.0.0.1' # localhost loopback IP if not connected to wifi
finally:
s.close() # close socket
net = ip.split(".")[:-1]
online_dev = list()
print(f"Scanning Hosts {ip}'s Network:")
for host in range(1, 0x64): # 0xff only scanning /24 subnet
dev = ".".join(net) + "." + str(host)
response = os.popen(f"ping -w 1 {dev}") # -c 1
try:
if response.readlines()[5]:
print(f"{dev} is online")
online_dev.append(dev)
except IndexError:
continue
return online_dev
[docs]def find_broker():
"""
Scan for online MQTT brokers then scan within network by checking online hosts then scanning for
open MQTT ports
:return: No return
"""
online_dev = scan_network()
online_dev = ["mqtt.eclipse.org", "test.mosquitto.org"]+online_dev # prepend online test brokers
for ip in online_dev:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# unencrypted port 1883 or encrypted port 8883 open.
conn = min(s.connect_ex((ip, 1883)), s.connect_ex((ip, 8883)))
if conn == 0:
print(f'MQTT Port 1883 or 8883 OPEN for {ip}')
else:
print(f'MQTT Port 1883 or 8883 CLOSED for {ip}')
s.close()
print("Scan Completed")
[docs]def gpio_init():
"""
Function to initialize the GPIO pins, numbering system used and communication protocols.
GPIO.BCM IS THE DEFAULT
"""
GPIO.setmode(GPIO.BCM) # Physical Board Pin Numbers
GPIO.setwarnings(False)
[docs]def cleanup():
"""
GPIO.cleanup() and exit(0) for a graceful exit.
"""
spidev.SpiDev().close() # close SPI connection
GPIO.cleanup()
exit(0)
def create_sensor_type(read, write, config): # Add support for new sensor type
pass
def sensor_attach_i2c(SensorType1, addr, sample_rate, broker): # Add sensor, assign broker and topic
pass