Posts: 5
Joined: Mon Apr 01, 2019 1:31 am

Websocket interfaces to devices using Arkady

Fri Apr 12, 2019 8:15 pm


I recently worked on a project where I needed to control a variety of devices connected to a couple of Raspberry Pis. The devices ranged from Arduinos/Nanpy, DMX, sensors, and other applications including omxplayer and pygame audio. Because of the complexity of the project, I implemented a websocket interface to units of device control so that I could write a clean master controller program that treated the devices in a nice abstract way.

Aside from helping to separate concerns (one application can focus on hardware/interprocess interaction and interface, while other applications can focus on logic), it's also been a handy way to set up remote, networked control of arbitrary devices or software. I've started to release this project under the name Arkady. Here is a link to the documentation I am working on.

I would love to get some eyes on the project and get some feedback. It's still in early-alpha stages and I would love to develop it further with other people's use cases in mind. Let me try to sell you on how it can be useful.

A worked example:

If you're not familiar with NanPy, check it out! It's great tool for prototyping as you can control an Arduino fully over a serial connection, no need to re-program and re-flash the chip, you just load this firmware once.

Let me show you a fully functional Arkady application that you can use to set up a generic interface to a NanPy slave Arduino from another process and across a network:

Code: Select all

#!/usr/bin/env python3

Demonstration of a very generic Arkady interface to NanPy.

Direct dependencies are: arkady, nanpy
Indirect depencences are: pyserial, pyzmq

from arkady import Application
from arkady.components import SerialComponent

from nanpy import SerialManager, ArduinoApi

ARDUINO_PORT = '/dev/ttyUSB0'  # On Windows this is more like "COM3"

class GenericNanpy(SerialComponent):
    def __init__(self, port, *args, **kwargs):
        super(GenericNanpy, self).__init__(*args, **kwargs)
        self._serial_manager = SerialManager(device=port, baudrate=115200)
        self.ardu = ArduinoApi(self._serial_manager)

    def analog_read(self, pin_number, *_words):
        """Read the pin in analog mode"""
        self.ardu.pinMode(pin_number, self.ardu.INPUT)
        return self.ardu.analogRead(pin_number)

    def digital_read(self, pin_number, *_words):
        """Read the pin in digital mode"""
        self.ardu.pinMode(pin_number, self.ardu.INPUT)
        return self.ardu.digitalRead(pin_number)

    def analog_write(self, pin_number, value, *_words):
        """Write the pin in analog mode to value"""
            value = int(value)
        except ValueError:
            return 'ERROR: Got a value that could not be treated as integer'
        self.ardu.pinMode(pin_number, self.ardu.OUTPUT)
        self.ardu.analogWrite(pin_number, value)

    def digital_write(self, pin_number, value, *_words):
        """Write the pin HIGH if value is 'high' otherwise LOW."""
        self.ardu.pinMode(pin_number, self.ardu.OUTPUT)
        if value == 'high':
            self.ardu.digitalWrite(pin_number, self.ardu.HIGH)
            self.ardu.digitalWrite(pin_number, self.ardu.LOW)

    def handler(self, msg, *args, **kwargs):
        """Handle an inbound message. Returned values go back as reply."""
        word_map = {
            'dwrite': self.digital_write,
            'awrite': self.analog_write,
            'dread': self.digital_read,
            'aread': self.analog_read,
        words = msg.split()
        if len(words) < 2:  # Check for too short message
            return 'ERROR: message must contain at least 2 words!'
        key_word = words[0]
            pin = int(words[1])
        except ValueError:
            return 'ERROR: got non-int for pin number {}'.format(words[1])
        if key_word not in word_map:  # Check if we recognize the first word
            return 'ERROR: not one of the known functions, {}'.format(word_map.keys())
            # Call the corresponding method
            ret_val = word_map[key_word](pin, *words[2:])
            if ret_val is not None:
                ret_val = str(ret_val)
            return ret_val
            return 'ERROR: "{}" failed, maybe a bad message or connection'.format(msg)

class MyApplication(Application):
    def config(self):
        self.add_component('nanpy', GenericNanpy, ARDUINO_PORT)


And here is a little demo script to make use of it:

Code: Select all

#!/usr/bin/env python3

Demonstration of a program controlling the very generic Arkady interface to

Direct dependencies are: pyzmq

import zmq

NANPY_ADRRESS = 'tcp://localhost:5555'  # replace localhost with IP if remote

context = zmq.Context()
sock = context.socket(zmq.REQ)

while True:
    msg = input('Send a message to the Nanpy device: ')
    sock.send_string('nanpy ' + msg)
    print('Waiting for reply.')
    reply = sock.recv_string()
    print('Got: ' + reply)

Now I can break that code down a little bit. First let's hit the highlights of the Component `GenericNanpy`. It inherits from `arkady.components.SerialComponent` so that it can guarantee that even if lots of messages come in simultaneously, only one action will be sent over the wire to the Arduino (thus the serial communication won't break).

I then implemented a few class methods for generic methods of the `ArduinoApi` from `nanpy`. Not much error checking here. The implementation of `handler` is key; in it I'm defining the interface and how to parse the messages that come through. I've given it some error handling so that it should not crash for most foreseeable problems with messages, and will pass back some error notes to the reply to the requester.

I have done two important things in the definition of `MyApplication.config`:
  • I registered an instance of `GenericNanpy` using `ARDUINO_PORT` as an argument, and given it the name "nanpy" so that any message with "nanpy" as the first world will be passed along to this component (minus the "nanpy").
  • I registered a `router` type listener on itself which will accept requests on port `5555`, set calls to component handlers by their name, and pass back the results of those calls as replies

So with a few lines of custom code we have a working wrapper around the NanPy interface that can be controlled from any other process that can utilize ZeroMQ (many, many languages) as well as over a network so you can avoid running extra wires.

I hope this my be of interest and use to folks. Please feel free to ask questions, there are some other features in Arkady not addressed in this post; and I am working to improve the documentation coverage. If you implement your own components and applications, I hope to see them and find where Arkady could be improved to help you better!

Contributions, issues, and discussion welcome on GitHub too:
Last edited by SavinaRoja on Sat Apr 13, 2019 3:20 am, edited 1 time in total.

User avatar
Posts: 376
Joined: Mon Dec 15, 2014 7:14 pm
Location: Noo Joysey, USA

Re: Websocket interfaces to devices using Arkady

Fri Apr 12, 2019 9:55 pm

It is somewhat siimilar to a framework I developed called python-banyan. Documentation can be found here:
I have compatible banyan frameworks for JavaScript, Ruby and Java

There is a demo of connecting a python banyan component to a JavaScript banyan component here:

It also contains an MQTT gateway to allow connection to and from MQTT networks.

I have gateways for the Raspberry Pi, Arduino, and ESP-8266 and am finishing up the documentation for those (to be published soon). Using those gateways, you develop a program that uses a single messaging set I call ONEgpio that allows a single component to control any or all 3 of the hardware gateways without any changes. To illustrate I developed a Tkinter GUI that is common to all three hardware platforms - only the pin numbers change based on a command line option and a Web-based GUI use the ONEgpio messaging set.

Python Banyan is being used by Palace Games as described in this Raspberry Pi Blog article: ... cape-room/

Posts: 5
Joined: Mon Apr 01, 2019 1:31 am

Re: Websocket interfaces to devices using Arkady

Fri Apr 12, 2019 10:26 pm

That's really neat. Interesting really how the concepts converge between the respective projects. At a broad level I think Arkady is less "focused" at the current stage than python-banyan; this is by design to an extent as I generalized the code that is now Arkady. The pub-sub model was something I just implemented on spec as I was using only req-rep myself.

I'm looking forward to delving deeper into your project, it looks quite clever.

Posts: 5
Joined: Mon Apr 01, 2019 1:31 am

Re: Websocket interfaces to devices using Arkady

Tue Apr 16, 2019 3:19 am

I've adapted the example above into the documentation.

User avatar
Posts: 376
Joined: Mon Dec 15, 2014 7:14 pm
Location: Noo Joysey, USA

Re: Websocket interfaces to devices using Arkady

Tue Apr 16, 2019 3:06 pm

I am still working on documenting my gateways (which includes one for WebSockets). I can write hundreds of lines of quality code in a day, but it takes me a week to write one paragraph of documentation ;-).

When I finish the docs, I will take a look a serious look at your package. If it makes sense, I will write a gateway for Arkady. If I am successful, we can then share components without the user having to pick one approach over the other. The gateway will be bidirectional so it will translate in both directions.

Return to “Python”