RobinMosedale
Posts: 44
Joined: Tue Jan 29, 2013 10:55 am

ESP32 Micropython asyncio queue overflow

Fri Jul 26, 2019 7:44 pm

Further to the endeavours to get the resilient asynchronous mqtt to work, I can with the single thread example
It was modified in a minor manner to publish a message in acknowledgement of receiving a subscribed message, an added line in the flash blue led
The whole was now set to add a concurrent thread.
A very very simple asyncio coroutine was added which merely printed out a couple of indicators and flashed a LED connected to pin 4 (tested elsewhere)

Before all of this I tested the simple concurrent thread examples successfully.

So..................................
It all runs for a few seconds, I see the messages fro the instantiated new coroutine, and then it crashes:-

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "rangec.py", line 93, in <module>
File "rangec.py", line 89, in <module>
File "/lib/uasyncio/core.py", line 180, in run_until_complete
File "/lib/uasyncio/core.py", line 159, in run_forever
File "/lib/uasyncio/core.py", line 58, in call_later_ms
File "/lib/uasyncio/core.py", line 63, in call_at_
IndexError: queue overflow
>>>

You can see that I've inserted a garbage collection as belt and braces, but I'm afraid that the run_until_complete object doesn't support debug=True otherwise I'd delve further.

This seems to be associated with the particular method in which the asynchronous MQTT example is written. I'm sure that I've done something simple, but again I'm having trouble finding documentation that relates reliably to this particular variant of asyncio, or anything micropython for that matter.


This is hugely frustrating, this method was selected particularly to use multiple asynchronous routines, and this the first most trivial attempt gives me opaque errors

I've trawled for hours to search for any hint why.

I'd be really grateful if someone could correct something stupid that I've done, before I lose all patience with the Micropython product.

Hugs and Kisses
Robin

The full listing is here

Code: Select all

# range.py Test of asynchronous mqtt client with clean session False.
# (C) Copyright Peter Hinch 2017-2019.
# Released under the MIT licence.

# Public brokers https://github.com/mqtt/mqtt.github.io/wiki/public_brokers

# This demo is for wireless range tests. If OOR the red LED will light.
# In range the blue LED will pulse for each received message.
# Uses clean sessions to avoid backlog when OOR.

# red LED: ON == WiFi fail
# blue LED pulse == message received
# Publishes connection statistics.

from mqtt_as import MQTTClient, config
from config import wifi_led, blue_led
import uasyncio as asyncio
import machine
import gc

loop = asyncio.get_event_loop()
outages = 0

async def pulse():  # This demo pulses blue LED each time a subscribed msg arrives.
    blue_led(True)
    await client.publish('foo_topic', 'Message received')
    await asyncio.sleep(1)
    blue_led(False)

def sub_cb(topic, msg):
    print((topic, msg))
    loop.create_task(pulse())

async def wifi_han(state):
    global outages
    wifi_led(not state)  # Light LED when WiFi down
    if state:
        print('We are connected to broker.')
    else:
        outages += 1
        print('WiFi or broker is down.')
    await asyncio.sleep(1)

async def conn_han(client):
    await client.subscribe('foo_topic', 1)

async def main(client):
    try:
        await client.connect()
    except OSError:
        print('Connection failed.')
        return
    n = 0
    while True:
        await asyncio.sleep(5)
        print('publish', n)
        # If WiFi is down the following will pause for the duration.
        await client.publish('result', '{} repubs: {} outages: {}'.format(n, client.REPUB_COUNT, outages), qos = 1)
        n += 1

#import machine

#pin2=machine.Pin(2,machine.Pin.OUT) #the inernal blue led
pin4=machine.Pin(4,machine.Pin.OUT) #externally attached led

async def testfunction ():
    while True:
        gc.collect()
        print ('LED On')
        pin4.on()
        await asyncio.sleep(1)
        print('LED Off')
        pin4.off
        await asyncio.sleep(1)
        print('Out')
    
# Define configuration
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['will'] = ('result', 'Goodbye cruel world!', False, 0)
config['connect_coro'] = conn_han
config['keepalive'] = 120

# Set up client
MQTTClient.DEBUG = True  # Optional
client = MQTTClient(config)
loop.create_task(testfunction())
try:
    loop.run_until_complete(main(client))
    
finally:  # Prevent LmacRxBlk:1 errors.
    client.close()
    blue_led(True)
and here is the full output noting the micropython version
MicroPython v1.11-167-g331c224e0 on 2019-07-22; ESP32 module with ESP32
Type "help()" for more information.
>>>
>>> import rangec
LED On
Checking WiFi integrity.
LED Off
Out
LED On
LED Off
Out
LED On
LED Off
Got reliable connection
Connecting to broker.
Connected to broker.
We are connected to broker.
(b'foo_topic', b'first boring message')
(b'foo_topic', b'Message received')
Out
LED On
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
(b'foo_topic', b'Message received')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "rangec.py", line 93, in <module>
File "rangec.py", line 89, in <module>
File "/lib/uasyncio/core.py", line 180, in run_until_complete
File "/lib/uasyncio/core.py", line 159, in run_forever
File "/lib/uasyncio/core.py", line 58, in call_later_ms
File "/lib/uasyncio/core.py", line 63, in call_at_
IndexError: queue overflow

blimpyway
Posts: 169
Joined: Mon Mar 19, 2018 1:18 pm

Re: ESP32 Micropython asyncio queue overflow

Fri Jul 26, 2019 9:09 pm

1. Try to publish message with qos = 0 instead of 1. That's most "lightweight" mode, if it does not overflow you may send the message multiple times, just to ensure delivery.

2. This is a micropython problem, and particularly a mqtt on micropython problem. There should be a github repository for them, asking there on their respective "issues" sections should give you more useful feedback than asking about esp32 issues on a raspberry pi forum.

RobinMosedale
Posts: 44
Joined: Tue Jan 29, 2013 10:55 am

Re: ESP32 Micropython asyncio queue overflow

Fri Jul 26, 2019 9:33 pm

Thank you Blimpy.

Astonishing. This will be the second fault report that I'll be posting on their GITHUB repository in so many weeks.

This example is the most elementary use of asyncio co-routines built directly on the micro python example.

Don't they test even the most elementary use.

I'll try a different QOS. However, this is intended to be used in a low level element time critical interrupt driven application. Adding another layer of cluge to handle errors is making this overwhelm the original app.

An astonishing poor product.
Last edited by RobinMosedale on Fri Jul 26, 2019 9:58 pm, edited 1 time in total.

blimpyway
Posts: 169
Joined: Mon Mar 19, 2018 1:18 pm

Re: ESP32 Micropython asyncio queue overflow

Fri Jul 26, 2019 9:41 pm

On the other topic you opened I suggested using plain UDP with asyncio,

I think I also suggested a code example you could try.

The nice thing about UDP in a restricted environment as a microcontroller is it does not keep queues and buffers.

Sure you have to handle message packet lost cases but you can send past 5 or 10 messages together with the new one. Some redundancy might bump over missing packets.

RobinMosedale
Posts: 44
Joined: Tue Jan 29, 2013 10:55 am

Re: ESP32 Micropython asyncio queue overflow

Sat Jul 27, 2019 2:53 pm

Indeed, I've had a response from the micropython forum.
As expected it's not a fault, but an overload.
The original code is fine.
On one esp32 it was purposely modified to publish a response to message received, itself providing +ve feedback and resonance.

It coped splendidly. Soak tested for more than 12 hours.

Adding the separately instantiated async testfunction was the straw that broke the camel's back. It couldn't keep up.
Quite understandably.

The publishing response has been removed.
It's fine, but has introduced some latency, which I may be able to live with when I learn how best to manage it.

I'm still struggling to find reliable specifications for the object/functions particular to the micropython implementation. If they were more definitive, then we'd be on the way to predicative code. In the meantime, it's the heuristic approach

We can close this off

The modified code is here:-

Code: Select all

# range.py Test of asynchronous mqtt client with clean session False.
# (C) Copyright Peter Hinch 2017-2019.
# Released under the MIT licence.

# Public brokers https://github.com/mqtt/mqtt.github.io/wiki/public_brokers

# This demo is for wireless range tests. If OOR the red LED will light.
# In range the blue LED will pulse for each received message.
# Uses clean sessions to avoid backlog when OOR.

# red LED: ON == WiFi fail
# blue LED pulse == message received
# Publishes connection statistics.

from mqtt_as import MQTTClient, config
from config import wifi_led, blue_led
import uasyncio as asyncio
import machine
import gc

loop = asyncio.get_event_loop()
outages = 0

async def pulse():  # This demo pulses blue LED each time a subscribed msg arrives.
    blue_led(True)
    #await client.publish('foo_topic', 'Message received')
    await asyncio.sleep(1)
    blue_led(False)

def sub_cb(topic, msg):
    print((topic, msg))
    loop.create_task(pulse())

async def wifi_han(state):
    global outages
    wifi_led(not state)  # Light LED when WiFi down
    if state:
        print('We are connected to broker.')
    else:
        outages += 1
        print('WiFi or broker is down.')
    await asyncio.sleep(1)

async def conn_han(client):
    await client.subscribe('foo_topic', 1)

async def main(client):
    try:
        await client.connect()
    except OSError:
        print('Connection failed.')
        return
    n = 0
    while True:
        await asyncio.sleep(5)
        print('publish', n)
        # If WiFi is down the following will pause for the duration.
        await client.publish('result', '{} repubs: {} outages: {}'.format(n, client.REPUB_COUNT, outages), qos = 1)
        n += 1

#import machine

#pin2=machine.Pin(2,machine.Pin.OUT) #the inernal blue led
pin4=machine.Pin(4,machine.Pin.OUT) #externally attached led

async def testfunction ():
    while True:
        #gc.collect()
        #print ('LED On')
        pin4.on()
        await asyncio.sleep(0.05)
        #print('LED Off')
        pin4.off()
        await asyncio.sleep(5)
        #print('Out')
    
# Define configuration
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['will'] = ('result', 'Goodbye cruel world!', False, 0)
config['connect_coro'] = conn_han
config['keepalive'] = 120

# Set up client
MQTTClient.DEBUG = True  # Optional
client = MQTTClient(config)
loop.create_task(testfunction())
try:
    loop.run_until_complete(main(client))
    
finally:  # Prevent LmacRxBlk:1 errors.
    client.close()
    blue_led(True)
    

Return to “Python”