erondem
Posts: 23
Joined: Wed Jul 11, 2018 12:19 pm

How to manage with background scheduled tasks

Tue Feb 26, 2019 11:41 am

I have a python program running on RP3 B+. Raspberry has a NRF24L01, TSL2561 and YF S201 water flow sensor.

Program flow should be as follows:

in main loop raspberry always should listen NRF channel , when there is data, some work is done in the data and sent to Azure.
as background tasks,
a) TSL2561 sensor makes measurements in every five minutes and send them to
b) GPIO.add_event_detect() should count pulses from YF S201 and when it comes to certain limit(as a test purpose it is 400 and it means 1 liter), literCount is incremented by 1
c) A background task again check this variable in every 3 minutes and if literCount variable is not 0 sends it to Azure.
I tested each functionality seperetaly and every one of them works. But when I use apscheduler (as BackgroundScheduler ), program does not work as expected. Here is my code:

Code: Select all

 #!/usr/bin/python 
import RPi.GPIO as GPIO
import time,spidev,json,datetime,enum,smbus
from lib_nrf24 import NRF24
from iothub_client import IoTHubClient, IoTHubTransportProvider, IoTHubMessage,IoTHubError
from apscheduler.schedulers.background import BackgroundScheduler


PROTOCOL = IoTHubTransportProvider.MQTT_WS
CONNECTION_STATUS_CALLBACKS = 0
MESSAGE_TIMEOUT = 10000 # miliseconds
TIMEOUT = 241000
MINIMUM_POLLING_TIME = 9
nodeIdListFile = "nodeIdList.json"
idListFile = "idList.json"
configsFile = "config.json"
errorFile = "errorFile.txt"
global globalNodeIdList
global globalIdList
pulsePerLiter=410
waterPin = 27 #BCM: 27 Board: 13
GPIO.setmode(GPIO.BCM)
GPIO.setup(waterPin,GPIO.IN)
global literCount , pulseCount, canSendBool
#canSendBool = False
pulseCount = 0
literCount = 0

pipes = [[0xE8,0xE8,0xF0,0xF0,0xE1],[0xF1,0xF1,0xF1,0xF1,0xE1],[0xF1,0xF1,0xF1,0xF1,0xE2],[0xF1,0xF1,0xF1,0xF1,0xE4],[0xF1,0xF1,0xF1,0xF1,0xE5],[0xF1,0xF1,0xF1,0xF1,0xE6]]

radio = NRF24(GPIO, spidev.SpiDev())
radio.begin(0,17) # (CSN,CE)
radio.setPayloadSize(100) #3RF24_1MBPS
radio.setChannel(0x60)#
radio.setDataRate(NRF24.BR_1MBPS)#
radio.setPALevel(NRF24.PA_MIN) 
radio.setAutoAck(True)#
radio.enableDynamicPayloads()#
#radio.setAutoAckPayload()#
radio.openReadingPipe(1,pipes[1])
radio.openReadingPipe(2,pipes[2])
radio.openReadingPipe(3,pipes[3])
#radio.printDetails()
radio.startListening()


####################################################################################################
#Water ISR and Interrupt

def pulseCounter(self):
    try:
        global literCount,pulseCount
        pulseCount += 1
        if(pulseCount>pulsePerLiter):
            pulseCount = 0
            literCount += 1
    except KeyboardInterrupt:
        GPIO.cleanup()


GPIO.add_event_detect(waterPin,GPIO.FALLING, callback=pulseCounter)

def sendWaterData(waterId):
    global literCount
    if(literCount != 0):
        data = {}
        data["NodeId"]                              = globalNodeIdList[waterId]                         
        data[globalIdList[waterId]]                 = literCount   
        data["ReadedAt"]                            = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")                      
        bufferJson = json.dumps(data)
        sendDataToAzure(bufferJson) 
        literCount = 0
    else:
        None 
        #print("{} no water flow detected".format(literCount))


#Water ISR and Interrupt
#############################################################################


#############################################################################
#Utilities

def jsonFileReader(jsonFile):
    with open(jsonFile, 'r') as f:
        tempJson = json.load(f)
    return tempJson


def updateConfigFiles():
    global globalNodeIdList
    global globalIdList
    global globalConfigs

    globalIdList = jsonFileReader(idListFile)
    globalNodeIdList = jsonFileReader(nodeIdListFile)
    globalConfigs =  jsonFileReader(configsFile)

def eventLogger(fileName,errorText):
    currentTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(fileName,'a+') as f:
        f.write(currentTime +"\t" + errorText + "\n")

#Utilities       
#####################################################################################################


#####################################################################################################
#Azure

def send_confirmation_callback(message, result, user_context):
    if not "OK" in str(result): 
        eventLogger(errorFile,"Error: Message can not sent to Azure. Result: %s " % str(result) )

    return result


def sendDataToAzure(iotHubString):
    print("iotHubString for {0}".format(iotHubString))
    print("--------------------------------------------------------------------")

    try:        
        if __name__ == '__main__':
            dataText = IoTHubMessage(iotHubString)
            client.send_event_async(dataText, send_confirmation_callback, None)

    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        eventLogger(errorFile, "Unexpected error %s from IoTHub" % iothub_error)

    except KeyboardInterrupt:
        print ( "IoTHubClient sample stopped" )


def prepareStringForAzure(inputString):

    #Convert input string to JSON format
    try:
        dataJson = json.loads(inputString)
    except ValueError:
        eventLogger(errorFile, "Data is received in non-proper format")
        print(" Data is not received in proper format" )

    #Construct appropriate JSON string for Azure Iot Hub
    if((dataJson["id"] in globalIdList) and dataJson["d"] != -1):

        data = {}
        data["NodeId"]                          = globalNodeIdList[dataJson["id"]]                         
        data[globalIdList[dataJson["id"]]]      = dataJson["d"]   
        data["ReadedAt"]                        = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")                      
        bufferJson = json.dumps(data)        
        return bufferJson


#Azure       
#####################################################################################################


#####################################################################################################
#Nrf

def getDataFromNrf():
    receivedMessage = []
    dataString = ""
    startTime = time.clock()
    while not radio.available(0):
        time.sleep(1/100)

        #TODO: do a time counter in here 
    else:
        pass


    if(radio.read(receivedMessage,radio.getDynamicPayloadSize())):
        for n in receivedMessage:
            if(n>=32 and n<=126): #byte to char
                dataString += chr(n)
        print("DATA: {0}".format(dataString))
        return dataString


    else:

        print("Error: Can not read from RF")
        return -1

#Nrf       
#####################################################################################################


#####################################################################################################
#Light 

def getLuxValue(lightType):

    TSLaddr = 0x39 #Default I2C address, alternate 0x29, 0x49 
    TSLcmd = 0x80 #Command
    chan0 = 0x0C #Read Channel0 sensor date
    chan1 = 0x0E #Read channel1 sensor data
    TSLon = 0x03 #Switch sensors on
    TSLoff = 0x00 #Switch sensors off
    #Exposure settings
    LowShort = 0x00 #x1 Gain 13.7 miliseconds
    LowMed = 0x01 #x1 Gain 101 miliseconds
    LowLong = 0x02 #x1 Gain 402 miliseconds
    LowManual = 0x03 #x1 Gain Manual
    HighShort = 0x10 #LowLight x16 Gain 13.7 miliseconds
    HighMed = 0x11  #LowLight x16 Gain 100 miliseconds
    HighLong = 0x12 #LowLight x16 Gain 402 miliseconds
    HighManual = 0x13 #LowLight x16 Gain Manual
    #Manual Settings
    ManDelay = 2 #Manual Exposure in Seconds
    StartMan = 0x1F #Start Manual Exposure
    EndMan = 0x1E #End Manual Exposure

    bus = smbus.SMBus(1)
    bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLon) # Power On
    bus.write_byte_data(TSLaddr, chan1 | 0x80, LowLong) # Low long
    time.sleep(0.5)

    data = bus.read_i2c_block_data(TSLaddr, chan0 | TSLcmd, 2)
    data1 = bus.read_i2c_block_data(TSLaddr, chan1 | TSLcmd, 2)


    if(lightType == "full"):
        bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
        return data[1] * 256 + data[0] #Return full spectrum 

    elif(lightType == "infrared"):
        bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
        return data1[1] * 256 + data1[0] #Return infrared light spectrum

    elif(lightType == "visible"):
        bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
        return ( (data[1] * 256 + data[0]) -(data1[1] * 256 + data1[0]) ) #Return visible light spectrum

    else:
        bus.write_byte_data(TSLaddr, 0x00 | TSLcmd, TSLoff)
        return "wrong choice"


def constructAndSendLuxJson(luxId):
        value = getLuxValue("visible")
        data = {}
        data["NodeId"]                          = globalNodeIdList[luxId]                         
        data[globalIdList[luxId]]               = value   
        data["ReadedAt"]                        = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")                      
        bufferJson = json.dumps(data)
        sendDataToAzure(bufferJson) 

#Light       
#############################################################################


updateConfigFiles()
#Schedued jobs: 
#   Send lux data based on interval given in config.json
#   update config file in every five minutes
#   check literCount in every 5 min
scheduler = BackgroundScheduler()  
scheduler.add_job(updateConfigFiles, 'interval', seconds=300) 
scheduler.add_job(constructAndSendLuxJson,'interval', ["lux"], seconds = globalConfigs["luxInterval"])
#When I comment out line in below, program works as expected.
scheduler.add_job(sendWaterData, 'interval',["h2o"], seconds = globalConfigs["h2ointerval"])
scheduler.start() 

client = IoTHubClient(globalConfigs["connectionString"], PROTOCOL)


while True:    
    dataFromNrf = getDataFromNrf()   

    if(dataFromNrf != -1):
        preparedAzureString = prepareStringForAzure(dataFromNrf)
        sendDataToAzure(preparedAzureString)        

    else:
        eventLogger(errorFile, "Error: cat not get data from RF")
OS: Raspbian, YF S201 has voltage divider with 10k and 4.7k and powered up from 5V

Can anyone help me with apscheduler or should I go with threads ? (I have little information about thread safe information). I suspect I used interrupt or ISR pulseCounter() not in correct way.

I appreciate any help

Return to “Beginners”