Racer_Rob
Posts: 8
Joined: Mon Apr 03, 2017 8:07 am

How should I structure my Python script?

Tue Apr 25, 2017 5:13 pm

I have two scripts:

> Script A controls an array of LEDs and flashes them in particular patterns within an infinite while loop.
> Script B controls a PC monitor and some buttons. The buttons are on callback interrupts, when they're pressed different images are displayed on the PC monitor.

In addition to the buttons' current function, I also want one of them to start the light pattern, one to stop it and one to change it's colour. What's the best way to structure this in the program?

Option 1: Can I trigger script A from within script B and leave A running independently in the background? Maybe I can pass variables to it to tell A whether to stop/start/change color?

Option 2: Or is it better to integrate A within B and try to integrate all of my code into the infinite while loop that is controlling the lighting?

I don't know how to implement Option 1 or if this is even the best approach. Any advice would be appreciated.

ghp
Posts: 1517
Joined: Wed Jun 12, 2013 12:41 pm
Location: Stuttgart Germany
Contact: Website

Re: How should I structure my Python script?

Tue Apr 25, 2017 9:00 pm

Hello,
a possible approach is to use threads and queues with messages sent from button thread to light thread.

Code: Select all

#!/bin/python3

import queue
import threading
import random
import time

messageQueue = queue.Queue()
runIt = True

def run_display():
    """LED display program (simulation)"""
    while runIt:
        try:
            s = messageQueue.get(block=True, timeout=0.1)
        except queue.Empty:
            continue
        if s == 'on_red':
            print ('switch red led on')
        if s == 'on_blue':
            print ('switch blue led on')
        if s == 'off_red':
            print ('switch red led off')
        if s == 'off_blue':
            print ('switch blue led off')

def run_button():
    """button detect program (simulation)"""
    while runIt:
        # Check your buttons here
        # instead of this, some time delay        
        time.sleep( float( random.randint(1,10)) * 0.5)
        messageQueue.put("on_red")
        time.sleep( float( random.randint(1,10)) * 0.5)
        messageQueue.put("on_blue")
        time.sleep( float( random.randint(1,10)) * 0.5)
        messageQueue.put("off_red")
        time.sleep( float( random.randint(1,10)) * 0.5)
        messageQueue.put("off_blue")

t_display = threading.Thread(target=run_display)
t_display.start()
t_button = threading.Thread(target=run_button)
t_button.start()

try:
    while True:
        time.sleep(0.1)
except KeyboardInterrupt:
    pass
runIt = False    
With this structure, it is possible to loosely couple two program fragments.
Hope this helps,
Gerhard

Racer_Rob
Posts: 8
Joined: Mon Apr 03, 2017 8:07 am

Re: How should I structure my Python script?

Sat May 06, 2017 9:21 am

Hi Gerhard,

Sorry it has taken me so long to reply. Thank you for your code, it really helped me understand how queues can be used between threads. :)

In my case the LEDs are turning on/off constantly inside several for loops so I need to receive the message within these loops. For example, I want to be able to pause the LEDs flashing and restart them later.

My solution is:

Code: Select all

#Python 2.7
 
import threading
import Queue
import time
q = Queue.Queue()
def LED_Flashing(LED_Rows, LED_Columns):
        while True:
                for LED in LED_Rows:
                        for LED in LED_Columns:
                               # Toggle the LED on/off
                                       if (q.empty() != True):
                                               if q.get(block=False) == "stop":
                                                       while (q.empty() == True):
                                                               time.sleep(0.1) # Wait here until any message received
                                                       clear_queue = q.get(block=False) # Read "Go" to clear the queue
                                      time.sleep(0.1)

def Stop_LED()
	q.put("Stop")
	
def Start_LED()
	q.put("Go")

t1 = threading.Thread(target=Rings_Spin, args=(100, 100))
t1.setDaemon(True)
t1.start()

try:
    while True:
        time.sleep(0.1)
except KeyboardInterrupt:
    pass
This works when I test it but I don't think it's particularly neat. Can you suggest any improvements please?

ghp
Posts: 1517
Joined: Wed Jun 12, 2013 12:41 pm
Location: Stuttgart Germany
Contact: Website

Re: How should I structure my Python script?

Sat May 06, 2017 10:25 am

Hello,
depending on the problem, some structure is needed. Keep the queue -loop as a thread, but use another thread to run the rows. columns which can be set to certain operation state 'run', 'stop' etc.
As all this stuff gets complicated, keep things together in a class.
There is also a peripheral-Class, which does the real work in addressing LED. This class has the basic operations as to flash a single led, clear all or whatever you need. The approach is: create this peripheral class, test it and then build the other things based on this.
The two nested loops on rows, columns have disappeared and are replaced by a 'increment position' thing or whatever you want to call it.

Code: Select all

import time
import threading
import Queue

class MyPeripheralClassForLED:
    def flashLed(self, row, column):   
        print("flash ", row, column)
        time.sleep(0.1) 
        # reset this led

    def clear(self):   
        print("clear all led")

class MyUltraFancyLedFlashingClassForColumnsAndRows:
    STOP = 'stop'
    RUN = 'run'
    RESET = 'reset'
    
    def __init__(self, queue, peripheral):
        self.peripheral = peripheral
        self.queue = queue
        self.N_ROW  = 10
        self.N_COLUMN = 10
        
        self.nRow = 0
        self.nColumn = 0
    
        self.state = self.STOP
        self.workThread = threading.Thread(target=self.ledRowColumnRunner)
        self.runInternal = True
        self.workThread.start()
        
        self.runIt = True
        self.queueThread = threading.Thread(target=self.run)
        self.queueThread.start()
        
         
    def ledRowColumnRunner(self):
        
        while self.runInternal:
            if self.state == self.STOP:
                time.sleep(0.1)
                continue
            
            if self.state == self.RESET:
                
                self.nRow = 0
                self.nColumn = 0
                self.peripheral.clear()
                time.sleep(0.1)
                time.sleep(0.1)
                continue

            if self.state == self.RUN:  
                # increment position 
                # walk in row, columns  
                self.nRow += 1
                if self.nRow == self.N_ROW:
                    self.nRow = 0
                    self.nColumn += 1
                    if self.nColumn == self.N_COLUMN:
                        self.nColumn = 0
                
                self.peripheral.flashLed( self.nRow, self.nColumn)
        
    
    def run(self):
        while self.runIt: 
            try:
                msg = self.queue.get(block=True, timeout=0.1)       
            except:
                continue
            # you could directly assign state, but some more control is good        
            if msg in (self.RUN, self.STOP, self.RESET):
                self.state = msg
            else:
                print("dont understand ", msg)
                

queue = Queue.Queue()

peripheral = MyPeripheralClassForLED()
ledRunner = MyUltraFancyLedFlashingClassForColumnsAndRows(queue, peripheral)

# test the stuff
for _ in range(3):
    queue.put( MyUltraFancyLedFlashingClassForColumnsAndRows.RUN)
    time.sleep(1.2)
    queue.put( MyUltraFancyLedFlashingClassForColumnsAndRows.STOP)
    time.sleep(5)
Did not implement the 'graceful shutdown' thing for all the threads for clarity (always a good excuse).
Hope this helps
Gerhard

Return to “Python”