Hello, some source code.
The idea was to have a background process, executing commands coming in by a socket connection.
I used this in a setup, where a LED and a RGB-LED should be controlled by a web server.
What you might find: the code is not very consistent in logging and the ioHandler idea was to encapsulate the IO-Access from the business logic. But the rgb-Thread directly controls IO, which violates this principle.
Server side:
Code: Select all
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Controlling one LED ON, OFF, BLINK.
# Controlling a RGB-LED with software PWM.
#
# Preparation in system:
# For Test only
# install telnet client and check localhost 9999
# apt-get install telnet
#
# Start process as root, the led blinks some time in order to indicate
# that process is running.
#
# Socket commands are
# 'ON' --> turn LED on
# 'OFF' --> turn LED off
# 'BLINK' -- blink LED
#
# RGB<r>,<g>,<b> --> Switch RGB values, control by pwm.
# Example RGB20,200,100
# The upper cas commands close the connection.
#
# The lower case commands on, off, blink and rgb keep the socket connection open,
# what is simpler for debug purpose.
#
# THE ON, OFF-Commands perform the action immediateley. The BLINK and RGB issue a thread
# which keeps until new commands arrive.
#
# Testing: open a telnet session 'telnet localhost 9999' and issue e.g. 'ON', 'OFF', 'BLINK'.
#
import socketserver
import time
import threading
import RPi.GPIO as GPIO
import re
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
debug = False
class IOHandler:
LED = 23
def __init__(self):
GPIO.setup(self.LED, GPIO.OUT)
def led_on(self):
global debug
GPIO.output(self.LED, True)
if debug:
print('led_on')
def led_off(self):
global debug
GPIO.output(self.LED, False)
if debug:
print('led_off')
class RGBThread (threading.Thread):
r = 40
g = 40
b = 40
R = 22
G = 18
B = 17
runIt = None
def __init__(self):
threading.Thread.__init__(self)
GPIO.setup(self.R, GPIO.OUT)
GPIO.setup(self.G, GPIO.OUT)
GPIO.setup(self.B, GPIO.OUT)
self.runIt = True
def setRGB(self, r,g,b):
self.r=r
self.g=g
self.b=b
def run(self):
global debug
if debug:
print('RGBThread.run')
k = 30000
while self.runIt:
GPIO.output(self.R, True)
time.sleep(self.r / k)
GPIO.output(self.R, False)
time.sleep((256-self.r)/k)
if self.runIt == False:
break
GPIO.output(self.G, True)
time.sleep(self.g / k)
GPIO.output(self.G, False)
time.sleep((256-self.g)/k)
if self.runIt == False:
break
GPIO.output(self.B, True)
time.sleep(self.b / k)
GPIO.output(self.B, False)
time.sleep((256-self.b)/k)
if self.runIt == False:
break
print("RGBThread.runit finished")
class BlinkThread(threading.Thread):
runIt = None
def __init__(self):
threading.Thread.__init__(self)
self.runIt = True
def run(self):
global ioHandler
while self.runIt:
ioHandler.led_on()
for i in range(20):
if self.runIt == False:
break
time.sleep(0.05)
if self.runIt == False:
break
ioHandler.led_off()
for i in range(20):
if self.runIt == False:
break
time.sleep(0.05)
print("BlinkThread.runit finished")
class PWMThread(threading.Thread):
runIt = None
def __init__(self):
threading.Thread.__init__(self)
self.runIt = True
def run(self):
global ioHandler
kk = 2000
while (self.runIt):
# iNNN are the values to set
iMax = 32
for i in range (iMax):
for k in range(5):
ioHandler.led_on()
time.sleep(i/kk)
if self.runIt == False:
break
ioHandler.led_off()
time.sleep((iMax-i)/kk)
if self.runIt == False:
break
if self.runIt == False:
break
for i in range (iMax-1,1,-1):
for k in range(5):
ioHandler.led_on()
time.sleep(i/kk)
if self.runIt == False:
break
ioHandler.led_off()
time.sleep((iMax-i)/kk)
if self.runIt == False:
break
if self.runIt == False:
break
print("PWMThread.runit finished")
thread = None
rgbThread = None
class MyTCPHandler(socketserver.BaseRequestHandler):
"""
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def stopThread(self):
global thread
print("stopThread")
if thread != None:
thread.runIt = False
print("wait for join")
thread.join()
thread = None
print("stopThread ok")
def startThreadBlink(self):
global thread
thread = BlinkThread()
thread.start()
def startThreadPWM(self):
global thread
thread = PWMThread()
thread.start()
def handle(self):
global rgbThread
##self.request.sendall(b"welcome\n")
while(True):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024)
if len(self.data) == 0:
break
self.data = self.data.strip()
print ("{} wrote:".format(self.client_address[0]))
print( self.data)
if self.data.startswith(b'on'):
self.request.sendall(b"switch it on\n")
if self.data.startswith(b'ON'):
self.stopThread()
ioHandler.led_on()
self.request.sendall(b"switch it on\n")
break
if self.data.startswith(b'off'):
self.request.sendall(b"switch it off\n")
if self.data.startswith(b'OFF'):
self.stopThread()
ioHandler.led_off()
self.request.sendall(b"switch it off\n")
break
if self.data.startswith(b'BLINK'):
self.stopThread()
self.startThreadBlink()
self.request.sendall(b"blink multiple\n")
break
if self.data.startswith(b'PWM'):
self.stopThread()
self.startThreadPWM()
self.request.sendall(b"pwm\n")
break
if self.data.startswith(b'RGB'):
if rgbThread == None:
rgbThread = RGBThread()
rgbThread.start()
regex = r"RGB,([0-9.]+),([0-9.]+),([0-9.]+)"
data = self.data.decode("iso-8859-1")
m = re.match(regex, data)
if m == None:
print('could not parse ' + data + ' with ' + regex)
else:
dr = int( m.group(1) )
dg = int( m.group(2) )
db = int( m.group(3) )
rgbThread.setRGB(dr,dg,db)
self.request.sendall(b"rgb\n")
break
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
ioHandler = IOHandler()
def main():
# some startup action, just for fun
for i in range(10):
ioHandler.led_on()
time.sleep(0.2)
ioHandler.led_off()
time.sleep(0.2)
HOST, PORT = "localhost", 9999
# Create the server, binding to localhost on port 9999
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
if __name__ == "__main__":
print('start')
main()
Testing, as indicated, with a telnet session.
A python client is:
Code: Select all
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
import socket
import sys
HOST, PORT = "localhost", 9999
data = " ".join(sys.argv[1:])
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Connect to server and send data
sock.connect((HOST, PORT))
## received = sock.recv(1024)
sock.sendall(bytes(data + "\n", 'UTF-8'))
# Receive data from the server and shut down
received = sock.recv(1024)
finally:
sock.close()
print ("Sent: {}".format(data))
print ("Received: {}".format(received))
Next a simple cgi-script which uses the background process.
Code: Select all
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import cgi
import cgitb
cgitb.enable
import socket
HOST, PORT = "localhost", 9999
def generate_form():
print ('Content-Type: text/html\n\n')
print ("<HTML>")
print ("<HEAD>")
print ("<TITLE>LED Form</TITLE>")
print ("</HEAD>")
print ("<BODY BGCOLOR = white>")
print ("<H1>LED Status auswaehlen.</H1>")
print ("<FORM>")
print ("<SELECT size = '7' name = 'led' style='font-size:14'>")
print(" <option>off</option> ")
print(" <option>on</option> ")
print(" <option>blink</option> ")
print(" <option>pwm</option> ")
print(" <option>rgb red</option> ")
print(" <option>rgb green</option> ")
print(" <option>rgb blue</option> ")
print("</SELECT>")
print ("<INPUT TYPE = hidden NAME = 'action' VALUE = 'ledstatus'>")
print ("<INPUT TYPE = submit VALUE = 'Enter' style='font-size:14'>")
print ("</FORM>")
print ("</BODY>")
print ("</HTML>")
print("last")
def socketServer(what):
data = 'BLINK'
if what == 'on':
data = 'ON'
if what == 'off':
data = 'OFF'
if what == 'blink':
data = 'BLINK'
if what == 'pwm':
data = 'PWM'
if what == 'rgb red':
data = 'RGB,255,0,0'
if what == 'rgb green':
data = 'RGB,0,255,0'
if what == 'rgb blue':
data = 'RGB,0,0,255'
print ('Content-Type: text/html\n\n')
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
received = 'no data received'
try:
# Connect to server and send data
sock.connect((HOST, PORT))
## received = sock.recv(1024)
sock.sendall(bytes(data + "\n", 'UTF-8'))
# Receive data from the server and shut down
received = sock.recv(1024)
except Exception as e:
print('unexpected problem: ' + str(e))
finally:
sock.close()
print ('<html>')
print ('<head>')
print ('gpio-page')
print ('</head>')
print ('<body>')
print ('gpio switched')
print ("<pre>")
print ("Sent: {}".format(data))
print ("Received: {}".format(received))
print ("</pre>")
print ("<br/>")
print ("<a href='cgi_3.py'>zurueck</a>")
print ('</body>')
print('</html>')
# Define main function.
def main():
form = cgi.FieldStorage()
if ("action" in form and "led" in form ):
if (form["action"].value == "ledstatus"):
socketServer(form["led"].value)
else:
generate_form()
# Call main function.
main()
Greetings, Gerhard