richrarobi
Posts: 271
Joined: Sun Feb 08, 2015 1:13 pm

Raspberry Colloid (?) Distributed Python Mayhem

Thu Dec 01, 2016 9:41 pm

Have more than one pi, maybe distributed round the house, or other systems running python3? Want to run programs on each and retrieve data to one place?
First, install zmq :- https://github.com/MonsieurV/ZeroMQ-RPi
Then run one program (zrep.py) on each target (server) (no changes needed if you use port 5555):-

Code: Select all

#!/usr/bin/python3
# Filename: zrep.py
from time import sleep
import zmq
import signal
import sys
import zlocal
import importlib

class Reply:
    def __init__(self, port):
        context = zmq.Context()
        self.sock = context.socket(zmq.REP)
        self.sock.bind("tcp://*:%s" % port)
        
    def wait(self):
        try:
            [func, args, kwargs] = self.sock.recv_json()
            try:
# reload the zlocal library if changed
                try:
                    importlib.reload(zlocal)
                    module = importlib.import_module('zlocal')
                    fn = getattr(module, func)
                except:
                    reply = "Error in zlocal, with {}".format(func)
                    fn = ""
                if callable(fn):
#                   print("Calling fn: {}".format(fn))
                    reply = fn(*args, **kwargs)
                self.sock.send_json(reply)
            except Exception as e:
                self.sock.send_json(e)
#               print ("Reply.wait exception")
                return
        except zmq.Again as e:
            return
#       print ("Reply.wait returning")
        return

if __name__ == "__main__":
# port for zmq reply
    port ="5555"
    try:
        rp = Reply(port)
        while True:
            try:
                rp.wait()
            except:
                pass
                sleep(1)
#            print("looping")

    except KeyboardInterrupt:
        print ("program stopping")
        sleep(2)
This program (zrep.py) uses zlocal.py which can be tailored per system (note runprc lets you run external commands in a list ):-

Code: Select all

#!/usr/bin/python3
# zlocal.py
from time import sleep
import subprocess

def is_ARM():
    tmp = runprc("cat /proc/cpuinfo")
    for line in tmp.splitlines():
        if "model name" in line:
            x, t = line.split(": ")
            if "ARM" in t:
                return True
    return False

def runprc(cmnd):
# be very careful with this one !!!
    from subprocess import Popen, STDOUT, PIPE
    import shlex
    try:
        args = shlex.split(cmnd)
        if args[0] in ["uname", "ls", "ps", "lsusb", "df", "cat"]:
            p = Popen(args, stdout=PIPE)
            out, err = p.communicate(timeout=30)
            out = out.decode("utf-8").rstrip("\n")
        else:
            out = "runprc: Not Allowed"
    except:
        p.kill()
    return out

def getTmp():
    if is_ARM():
        import subprocess
        tmp = subprocess.check_output(["/opt/vc/bin/vcgencmd", "measure_temp"])
        tmp = tmp.decode("utf-8").rstrip("\n")
        tmp = tmp[5:]
        tmp = tmp[:-2]
        return tmp
    else:
        return "notARM"

def datim():
    import datetime
    now = datetime.datetime.now()
    x = str(now)
    y, z = x.split(" ")
    y, m, d = y.split("-")
    hh, mm, ss = z.split(":")
    ss, dec = ss.split(".")
    stmp = "{}-{}-{} {}:{}:{}".format(y, m, d, hh, mm, ss)
    return stmp

if __name__ == "__main__":
    print(getTmp())
    print(datim())
    print(runprc("ls -l"))

Any special programs, per system, go into zlocal.py. You need to get your error handling right, changes will be automatically picked up by zrep.py - if you make a mistake, you possibly need to stop and restart zrep.py. (i have this running from startup)

zprox.py is used by the program (doesn't need to change):-

Code: Select all

#!/usr/bin/python3
# zprox
import zmq
import sys

# proxy to call the  remote system
class ZProxy:
    def __init__(self, url):
        context = zmq.Context()
        self.sock = context.socket(zmq.REQ)
        self.srv, x = url.split('.', 1)
        self.sock.connect("tcp://{}".format(url))
# x seconds timeout
        self.sock.RCVTIMEO = 20*1000
        self.sock.LINGER = 0
# use poller for timeouts
        self.poller = zmq.Poller()
        self.poller.register(self.sock, zmq.POLLIN)
        
    def __getattr__(self, name):
        def rpc(*args, **kwargs):
            try:
#                print("RPC sending to: {}, {}".format(self.srv, name))
                self.sock.send_json([name, args, kwargs])
            except zmq.ZMQError as e:
#                print("Excpt : {}, {}".format(self.srv, e))
                pass
# poll the socket - x seconds timeout
            if self.poller.poll(5*1000):
                reply = self.sock.recv_json()
                if isinstance(reply, Exception):
                    raise reply
            else:
# timeout reached, so no reply
                reply = "NoReply"
            return reply
        return rpc
Note this is not a cluster, but has some similarities. Systems can get on with their primary purpose (like running a camera) and still be accessed for sensors, etc. Hope that gives someone some help. If you want the multiprocessing version let me know.
I can also publish the togleds and other functions if wanted, e.g. the ones to access the sensehat.
Last edited by richrarobi on Mon Dec 12, 2016 4:04 pm, edited 2 times in total.

richrarobi
Posts: 271
Joined: Sun Feb 08, 2015 1:13 pm

Re: Raspberry Colloid (?) Distributed Python Mayhem

Mon Dec 05, 2016 10:23 am

Here is my stab at a tkinter front end (runs on any python3+zmq+tkinter system)
Screenshot_2016-12-12_15-48-11.png
Screenshot_2016-12-12_15-48-11.png (10.18 KiB) Viewed 1024 times

Code: Select all

#!/usr/bin/python3
import tkinter as tk
from zprox import ZProxy
#
class SNS:
    def __init__(self):
        self.head = tk.Label(root, text = "SenseHAT:")
        self.head.grid(row=13)
        self.senses = tk.Label(root, text = "Senses")
        self.senses.grid(row=13, column=2, padx = 5)
        
    def showSenses(self):
        sen = "Temp: {}".format(zsrv[servers.index("e")].getSTemp())
        sen = sen + ", Humi: {}".format(zsrv[servers.index("e")].getSHumi())
        sen = sen + ", Press: {}".format(zsrv[servers.index("e")].getSPress())
        self.senses.configure(text = sen)
# every 15 minutes
        root.after(15*60*1000, self.showSenses)
#
class CPU_T:
    def __init__(self):
        head = tk.Label(root, text= "CPU temps, on systems:")
        head.grid(row=15)
        txt = ""
        for server in servers:
            r = servers.index(server)
            txt = txt + server + "\t"
        self.lab = tk.Label(root, text = txt)
        self.lab.grid(row = 15, column = 1, padx = 5, columnspan=len(servers))
        self.cpu = tk.Label(root, text = "")
        self.cpu.grid(row=16, column = 1, padx = 5, columnspan=len(servers))
             
    def showCpuTemps(self):
        txt = ""
        for server in servers:
            r = servers.index(server)
            txt = str(txt) + str("{}".format(zsrv[r].getTmp()))  + "\t"
        self.cpu.configure(text = txt)
# every 10 minutes
        root.after(10*60*1000, self.showCpuTemps)
#
if __name__ == "__main__":
    root = tk.Tk()
    root.title('tkLink')
    servers = [ "a", "b", "c", "d", "e", "f", "rock"]
    zsrv = []
    for server in servers:
        port = "5555"
# get and store proxies for all remote servers in zsrv list
        zsrv.append(ZProxy(server + ".local:" + port))
#
    sns = SNS()
    sns.showSenses()
    cput = CPU_T()
    cput.showCpuTemps()
# remember the loop
    root.mainloop()
Note that system e with the senseHat has extra routines in zlocal.py.

richrarobi
Posts: 271
Joined: Sun Feb 08, 2015 1:13 pm

Re: Raspberry Colloid (?) Distributed Python Mayhem

Sun Jan 01, 2017 8:36 pm

Example using a PIR to set LED on a remote pi :-
zprox.py and zrep.py as previously posted, zprox on sender and zrep running on receiving server:
on system c with PIR:

Code: Select all

#!/usr/bin/python3
# pir5.py on system c
import RPi.GPIO as GPIO
from time import sleep
from zprox import ZProxy

def callback(chan):
    if chan == pir:
#        print("Callback!")
        if GPIO.input(chan) == True:
# set LED on system b
            zpb.setled(led)
            while GPIO.input(chan) == True:
                sleep(0.05)
# clear LED on system b
            zpb.clrled(led)
        else:
# shouldn't happen
            print("Eh! Up!")
    else:
#        other channel callbacks?
        print("Channel?")

if __name__ == "__main__":
    GPIO.setmode(GPIO.BCM)
    GPIO.setwarnings(False)
    pir = 18
    led = 10
    GPIO.setup(pir,GPIO.IN)
    try:
# b.local has the LED
        zpb = ZProxy("b.local:5555")
#  Wait for PIR to settle
        while GPIO.input(pir)==1:
# clear LED on system b
            zpb.clrled(led)

        GPIO.add_event_detect(pir, GPIO.RISING, callback=callback)
# then just wait
        while True :
            sleep(1)
 
    except KeyboardInterrupt:
        print("Stopping!")
        sleep(2)
        zpb.clrled(led)
        GPIO.remove_event_detect(pir)
        GPIO.cleanup()
zlocal.py on system b with LEDs on GPIO (zlocal is called by zrep.py):

Code: Select all

#!/usr/bin/python3
# zlocal.py
import datetime
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

def is_ARM():
    tmp = runprc("cat /proc/cpuinfo")
    for line in tmp.splitlines():
        if "model name" in line:
            x, t = line.split(": ")
            if "ARM" in t:
                return True
    return False

def getTmp():
    if is_ARM():
        import subprocess
        tmp = subprocess.check_output(["/opt/vc/bin/vcgencmd", "measure_temp"])
        tmp = tmp.decode("utf-8").rstrip("\n")
        tmp = tmp[5:]
        tmp = tmp[:-2]
        return tmp
    else:
        return "notARM"

def runprc(cmnd):
# be very careful with this one !!!
    from subprocess import Popen, STDOUT, PIPE
    import shlex
    try:
        args = shlex.split(cmnd)
        if args[0] in ["uname", "ls", "ps", "lsusb", "df", "cat"]:
            p = Popen(args, stdout=PIPE)
            out, err = p.communicate(timeout=30)
            out = out.decode("utf-8").rstrip("\n")
        else:
            out = "runprc: Not Allowed"
    except:
        p.kill()
    return out

def datim():
    import datetime
    now = datetime.datetime.now()
    x = str(now)
    y, z = x.split(" ")
    y, m, d = y.split("-")
    hh, mm, ss = z.split(":")
    stmp = "{}-{}-{} {}:{}".format(y, m, d, hh, mm)
    return stmp

def getBMP():
    try:
        import Adafruit_BMP.BMP085 as BMP085
        sensor = BMP085.BMP085()
        t = round(sensor.read_temperature(),2)
        p = round(sensor.read_pressure(),2)
#        a = round(sensor.read_altitude(),2)
        slp = round(sensor.read_sealevel_pressure(260.0),2)
#    print('Temp = {0:0.2f} *C'.format(t))
#    print('Pressure = {0:0.2f} Pa'.format(p))
#    print('Altitude = {0:0.2f} m'.format(a))
#    print('Sealevel Pressure = {0:0.2f} Pa'.format(slp))
#    reply = {"temp": t, "press": p, "Sea": slp}
        reply = "temp: {}, press: {}, sea: {}".format(t, p, slp)

    except:
        reply = "Error in getBMP"

    return reply


def getBearing():
# sparkfun HMC5883L triple axis magnetometer
    def read_byte(adr):
        return bus.read_byte_data(address, adr)

    def read_word(adr):
        high = bus.read_byte_data(address, adr)
        low = bus.read_byte_data(address, adr+1)
        val = (high << 8) + low
        return val

    def read_word_2c(adr):
        val = read_word(adr)
        if (val >= 0x8000):
            return -((65535 - val) + 1)
        else:
            return val

    def write_byte(adr, value):
        bus.write_byte_data(address, adr, value)

    import smbus
    import math
    try:
        bus = smbus.SMBus(1)
        address = 0x1e

        write_byte(0, 0b01110000) # Set to 8 samples @ 15Hz
        write_byte(1, 0b00100000) # 1.3 gain LSb / Gauss 1090 (default)
        write_byte(2, 0b00000000) # Continuous sampling

        scale = 0.92

        x = read_word_2c(3) * scale
        y = read_word_2c(7) * scale
        z = read_word_2c(5) * scale

        brng  = math.atan2(y, x)
        if (brng < 0):
            brng += 2 * math.pi
        brng = round(math.degrees(brng),2)
        x = round(x,2)
        y = round(y,2)
        z = round(z,2)
        reply = "brng : {}, x : {}, y : {}, z : {}".format(brng, x, y, z)
#        print(reply)
    except:
        reply = "Error in getBearing"

    return reply

def togled(r):
# ensure r is in range
    r = r % 14
    chan_list = [5, 6, 13, 19, 26, 25, 12, 16, 20, 21, 22, 23, 27, 4]
    GPIO.setup(chan_list, GPIO.OUT)
    n = chan_list[r]
    GPIO.output(n, not GPIO.input(n))
    return "togled"

def setled(r):
    r = r % 14
#                0, 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13
    chan_list = [5, 6, 13, 19, 26, 25, 12, 16, 20, 21, 22, 23, 27,  4]
    GPIO.setup(chan_list, GPIO.OUT)
    n = chan_list[r]
    GPIO.output(n, 1)
    return "setled"
    
def clrled(r):
    r = r % 14
    chan_list = [5, 6, 13, 19, 26, 25, 12, 16, 20, 21, 22, 23, 27, 4]
    GPIO.setup(chan_list, GPIO.OUT)
    n = chan_list[r]
    GPIO.output(n, 0)
    return "clrled"

# test code for zlocal
if __name__ == "__main__":
    from time import sleep
    try:
#        print(getTmp())
#        print(getBearing())
#        print(getBMP())
        while True:
            for n in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]:
                setled(n)
                sleep(0.1)
            for n in [13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]:
                clrled(n)
                sleep(0.1)
                

    except KeyboardInterrupt:
        print("Stopping")
        sleep(1)
        for n in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]:
                clrled(n)
        sleep(1)


Return to “Other projects”