Program flow should be as follows:
in main loop raspberry always should listen NRF channel , when there is data, some work is done in the data and sent to Azure.
as background tasks,
a) TSL2561 sensor makes measurements in every five minutes and send them to
b) GPIO.add_event_detect() should count pulses from YF S201 and when it comes to certain limit(as a test purpose it is 400 and it means 1 liter), literCount is incremented by 1
c) A background task again check this variable in every 3 minutes and if literCount variable is not 0 sends it to Azure.
I tested each functionality seperetaly and every one of them works. But when I use apscheduler (as BackgroundScheduler ), program does not work as expected. Here is my code:
Code: Select all
#!/usr/bin/python
import RPi.GPIO as GPIO
import time,spidev,json,datetime,enum,smbus
from lib_nrf24 import NRF24
from iothub_client import IoTHubClient, IoTHubTransportProvider, IoTHubMessage,IoTHubError
from apscheduler.schedulers.background import BackgroundScheduler
PROTOCOL = IoTHubTransportProvider.MQTT_WS
CONNECTION_STATUS_CALLBACKS = 0
MESSAGE_TIMEOUT = 10000 # miliseconds
TIMEOUT = 241000
MINIMUM_POLLING_TIME = 9
nodeIdListFile = "nodeIdList.json"
idListFile = "idList.json"
configsFile = "config.json"
errorFile = "errorFile.txt"
global globalNodeIdList
global globalIdList
pulsePerLiter=410
waterPin = 27 #BCM: 27 Board: 13
GPIO.setmode(GPIO.BCM)
GPIO.setup(waterPin,GPIO.IN)
global literCount , pulseCount, canSendBool
#canSendBool = False
pulseCount = 0
literCount = 0
pipes = [[0xE8,0xE8,0xF0,0xF0,0xE1],[0xF1,0xF1,0xF1,0xF1,0xE1],[0xF1,0xF1,0xF1,0xF1,0xE2],[0xF1,0xF1,0xF1,0xF1,0xE4],[0xF1,0xF1,0xF1,0xF1,0xE5],[0xF1,0xF1,0xF1,0xF1,0xE6]]
radio = NRF24(GPIO, spidev.SpiDev())
radio.begin(0,17) # (CSN,CE)
radio.setPayloadSize(100) #3RF24_1MBPS
radio.setChannel(0x60)#
radio.setDataRate(NRF24.BR_1MBPS)#
radio.setPALevel(NRF24.PA_MIN)
radio.setAutoAck(True)#
radio.enableDynamicPayloads()#
#radio.setAutoAckPayload()#
radio.openReadingPipe(1,pipes[1])
radio.openReadingPipe(2,pipes[2])
radio.openReadingPipe(3,pipes[3])
#radio.printDetails()
radio.startListening()
####################################################################################################
#Water ISR and Interrupt
def pulseCounter(self):
try:
global literCount,pulseCount
pulseCount += 1
if(pulseCount>pulsePerLiter):
pulseCount = 0
literCount += 1
except KeyboardInterrupt:
GPIO.cleanup()
GPIO.add_event_detect(waterPin,GPIO.FALLING, callback=pulseCounter)
def sendWaterData(waterId):
global literCount
if(literCount != 0):
data = {}
data["NodeId"] = globalNodeIdList[waterId]
data[globalIdList[waterId]] = literCount
data["ReadedAt"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
bufferJson = json.dumps(data)
sendDataToAzure(bufferJson)
literCount = 0
else:
None
#print("{} no water flow detected".format(literCount))
#Water ISR and Interrupt
#############################################################################
#############################################################################
#Utilities
def jsonFileReader(jsonFile):
with open(jsonFile, 'r') as f:
tempJson = json.load(f)
return tempJson
def updateConfigFiles():
global globalNodeIdList
global globalIdList
global globalConfigs
globalIdList = jsonFileReader(idListFile)
globalNodeIdList = jsonFileReader(nodeIdListFile)
globalConfigs = jsonFileReader(configsFile)
def eventLogger(fileName,errorText):
currentTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(fileName,'a+') as f:
f.write(currentTime +"\t" + errorText + "\n")
#Utilities
#####################################################################################################
#####################################################################################################
#Azure
def send_confirmation_callback(message, result, user_context):
if not "OK" in str(result):
eventLogger(errorFile,"Error: Message can not sent to Azure. Result: %s " % str(result) )
return result
def sendDataToAzure(iotHubString):
print("iotHubString for {0}".format(iotHubString))
print("--------------------------------------------------------------------")
try:
if __name__ == '__main__':
dataText = IoTHubMessage(iotHubString)
client.send_event_async(dataText, send_confirmation_callback, None)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
eventLogger(errorFile, "Unexpected error %s from IoTHub" % iothub_error)
except KeyboardInterrupt:
print ( "IoTHubClient sample stopped" )
def prepareStringForAzure(inputString):
#Convert input string to JSON format
try:
dataJson = json.loads(inputString)
except ValueError:
eventLogger(errorFile, "Data is received in non-proper format")
print(" Data is not received in proper format" )
#Construct appropriate JSON string for Azure Iot Hub
if((dataJson["id"] in globalIdList) and dataJson["d"] != -1):
data = {}
data["NodeId"] = globalNodeIdList[dataJson["id"]]
data[globalIdList[dataJson["id"]]] = dataJson["d"]
data["ReadedAt"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
bufferJson = json.dumps(data)
return bufferJson
#Azure
#####################################################################################################
#####################################################################################################
#Nrf
def getDataFromNrf():
receivedMessage = []
dataString = ""
startTime = time.clock()
while not radio.available(0):
time.sleep(1/100)
#TODO: do a time counter in here
else:
pass
if(radio.read(receivedMessage,radio.getDynamicPayloadSize())):
for n in receivedMessage:
if(n>=32 and n<=126): #byte to char
dataString += chr(n)
print("DATA: {0}".format(dataString))
return dataString
else:
print("Error: Can not read from RF")
return -1
#Nrf
#####################################################################################################
#####################################################################################################
#Light
def getLuxValue(lightType):
TSLaddr = 0x39 #Default I2C address, alternate 0x29, 0x49
TSLcmd = 0x80 #Command
chan0 = 0x0C #Read Channel0 sensor date
chan1 = 0x0E #Read channel1 sensor data
TSLon = 0x03 #Switch sensors on
TSLoff = 0x00 #Switch sensors off
#Exposure settings
LowShort = 0x00 #x1 Gain 13.7 miliseconds
LowMed = 0x01 #x1 Gain 101 miliseconds
LowLong = 0x02 #x1 Gain 402 miliseconds
LowManual = 0x03 #x1 Gain Manual
HighShort = 0x10 #LowLight x16 Gain 13.7 miliseconds
HighMed = 0x11 #LowLight x16 Gain 100 miliseconds
HighLong = 0x12 #LowLight x16 Gain 402 miliseconds
HighManual = 0x13 #LowLight x16 Gain Manual
#Manual Settings
ManDelay = 2 #Manual Exposure in Seconds
StartMan = 0x1F #Start Manual Exposure
EndMan = 0x1E #End Manual Exposure
bus = smbus.SMBus(1)
bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLon) # Power On
bus.write_byte_data(TSLaddr, chan1 | 0x80, LowLong) # Low long
time.sleep(0.5)
data = bus.read_i2c_block_data(TSLaddr, chan0 | TSLcmd, 2)
data1 = bus.read_i2c_block_data(TSLaddr, chan1 | TSLcmd, 2)
if(lightType == "full"):
bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
return data[1] * 256 + data[0] #Return full spectrum
elif(lightType == "infrared"):
bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
return data1[1] * 256 + data1[0] #Return infrared light spectrum
elif(lightType == "visible"):
bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
return ( (data[1] * 256 + data[0]) -(data1[1] * 256 + data1[0]) ) #Return visible light spectrum
else:
bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
return "wrong choice"
def constructAndSendLuxJson(luxId):
value = getLuxValue("visible")
data = {}
data["NodeId"] = globalNodeIdList[luxId]
data[globalIdList[luxId]] = value
data["ReadedAt"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
bufferJson = json.dumps(data)
sendDataToAzure(bufferJson)
#Light
#############################################################################
updateConfigFiles()
#Schedued jobs:
# Send lux data based on interval given in config.json
# update config file in every five minutes
# check literCount in every 5 min
scheduler = BackgroundScheduler()
scheduler.add_job(updateConfigFiles, 'interval', seconds=300)
scheduler.add_job(constructAndSendLuxJson,'interval', ["lux"], seconds = globalConfigs["luxInterval"])
#When I comment out line in below, program works as expected.
scheduler.add_job(sendWaterData, 'interval',["h2o"], seconds = globalConfigs["h2ointerval"])
scheduler.start()
client = IoTHubClient(globalConfigs["connectionString"], PROTOCOL)
while True:
dataFromNrf = getDataFromNrf()
if(dataFromNrf != -1):
preparedAzureString = prepareStringForAzure(dataFromNrf)
sendDataToAzure(preparedAzureString)
else:
eventLogger(errorFile, "Error: cat not get data from RF")Can anyone help me with apscheduler or should I go with threads ? (I have little information about thread safe information). I suspect I used interrupt or ISR pulseCounter() not in correct way.
I appreciate any help