小能豆

在 python 中如何从 asyncio.gather 异步任务循环中获取“RPi.GPIO”事件回调

py

我已经了解 asyncio 好几天了,并且在 raspbaerry pi 板代码中为 lora sx127x 模块实现了它。

我想知道如何在我的 asyncio 运行(主)循环中实现添加 gpio 事件回调。我知道 gpio 事件在后台的不同线程中运行,而不是在 asyncio 循环中。我希望能够让“on_rx_done”在 asyncio 代码中工作。

任何文档建议或提示都将非常有益。我已经从 python 文档中准备好了 asyncio.event、asyncio.loop、asyncio.future,但无法决定哪一个在这里有效。如果可能的话,我想从这里使用高级 asyncio api

这是我的基本参考代码,它可以同步工作。我想将其转换为与 asyncio 模块兼容。

我希望获得“async _dio0()”回调,该回调调用“async on_rx_done()”来填充queue_1中的“payload”,然后我开始在“make_json_string”函数中处理“payload”等等。

以防需要帮助理解上下文。这是我当前代码的简化版本。

  1. 使用 contex var 在 asyncio 循环之间传递queue_1
  2. 尝试设置 asyncio.Event.set 但失败了
  3. 每次接收事件时都会出现此错误“<_“UnixSelectorEventLoop”running=True closed=False debug=False>”,但现在发生了响应。
import asyncio
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config import BOARD
from contextvars import ContextVar

class LoRaRcvCont(LoRa):

   server_status = False

   def __init__(self, verbose=False):
        super(LoRaRcvCont, self).__init__(verbose)

        # here bellow add event function is called
        # GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback)
        # equivalant GPIO.add_event_detect(gpio_4, GPIO.RISING, callback=_dio0)

        self.set_mode(MODE.SLEEP)
        self.set_dio_mapping([0] * 6)

   async def _dio0(self, channel):#override callback function as async function
        # DIO0 00: RxDone
        # DIO0 01: TxDone
        # DIO0 10: CadDone
        print('_dio')
        if self.dio_mapping[0] == 0:
            await self.on_rx_done()
        elif self.dio_mapping[0] == 1:
            await self.on_tx_done()
        elif self.dio_mapping[0] == 2:
            await self.on_cad_done()
        else:
            raise RuntimeError("unknown dio0mapping!")

    async def on_rx_done(self):#override function as async function
        print("\nLoRa RxDone")
        await asyncio.sleep(.01)
        if self.dio_mapping[0] == 0:
            print("LoRa RxDone")
            self.clear_irq_flags(RxDone=1)
            payload = self.read_payload(nocheck=True)

            # add task in queue_1 if payload not empty
            if len(payload) > 0:
                temp_queue_1 = queue_1_var.get()
                try:
                    print('queue_1 put task, start')
                    await temp_queue_1.put(payload)
                except:  # pylint: disable=bare-except
                    print('ERROR in put queue_1.')
                finally:
                    print('queue_1 put task,  done')
            # return payload

    async def make_json_string(self, queue_1, queue_2):
        while True:
            # print("make_json_string")
            # get a unit of work
            # print("queue_1 get = ")
            payload = await queue_1.get()

            # Notify the queue that the "work item" has been processed.
            queue_1.task_done()

    async def start(self, queue_1):
        self.reset_ptr_rx()
        self.set_mode(MODE.RXCONT)
        count = 0
        #payload = [205, 171, 1, 82, 77, 49, 48, 48, 48, 48]

        # asyncio.get_event_loop()
        while True:

            await asyncio.sleep(3)
            # await queue_1.put(payload)

            count = count + 1
            print("loop" + str(count))

async def main():

    BOARD.setup()

    # create the shared queue
    queue_1 = asyncio.Queue()
    queue_1_var.set(queue_1)


    # Create an Event object.
    event = asyncio.Event()

    lora = lora_config_setup()

    try:
        print("asyncio loop starts ...")
        await asyncio.gather(
            asyncio.create_task(lora.start(queue_1)),
            #asyncio.Event.set(lora._dio0(None)),
            asyncio.create_task(lora.make_json_string(queue_1),
        )
        # )

        print("main done")
    except KeyboardInterrupt:
        sys.stdout.flush()
        print("")
        sys.stderr.write("KeyboardInterrupt\n")
    finally:
        sys.stdout.flush()
        print("")
        lora.set_mode(MODE.SLEEP)

    debug_print(lora)
    BOARD.teardown()

########################
# code starts from here
#########################
queue_1_var = ContextVar('queue_1')
asyncio.run(main())
print("code finished")

2023年3月23日更新:

我正在对 GPIO 输入方法使用异步轮询,直到找到 GPIO 事件回调的适当解决方案。

...
class LoRaRcvCont(LoRa):

    server_status = False

    def __init__(self, verbose=False):
        super(LoRaRcvCont, self).__init__(verbose)
        self.set_mode(MODE.SLEEP)
        self.set_dio_mapping([0] * 6)
    async def async_dio0(self, channel):
        # DIO0 00: RxDone
        # DIO0 01: TxDone
        # DIO0 10: CadDone

        print('in _dio')
        while True:
            await asyncio.sleep(.1) 
            if GPIO.input(channel): #irq pin high

                #self.get_dio_mapping() #get dio mapping
                print('got interrupt _dio')
                print(f"rx_done: {self.dio_mapping[0]}")
                if self.dio_mapping[0] == 0:
                    await self.on_rx_done()
                elif self.dio_mapping[0] == 1:
                    await self.on_tx_done()
                elif self.dio_mapping[0] == 2:
                    await self.on_cad_done()
                else:
                    raise RuntimeError("unknown dio0mapping!")
            else:
                pass
...

#main asyncio coroutine here all code is executed
async def main():

    BOARD.setup()

    # create the shared queue
    queue_1 = asyncio.Queue()
    queue_1_var.set(queue_1)


    # Create an Event object.
    event = asyncio.Event()

    lora = lora_config_setup()
    # http_server_check() # was using request module

    try:
        print("asyncio loop starts ...")
        await asyncio.gather(
            asyncio.create_task(lora.start(queue_1)),
            # asyncio.create_task(lora.on_rx_done()),
            # asyncio.create_task(lora.on_tx_done),
            asyncio.create_task(lora.async_dio0(BOARD.DIO0)),
            asyncio.create_task(lora.make_json_string(queue_1)),
        )

        print("main done")
    except KeyboardInterrupt:
        sys.stdout.flush()
        print("")
        sys.stderr.write("KeyboardInterrupt\n")
    finally:
        sys.stdout.flush()
        print("")
        lora.set_mode(MODE.SLEEP)

    debug_print(lora)
    BOARD.teardown()

queue_1_var = ContextVar('queue_1')

asyncio.run(main())
print("code finished")

阅读 14

收藏
2025-01-04

共1个答案

小能豆

在你的代码中,通过异步轮询来模拟 GPIO 中断处理是一个有效的起点,但它可能并不是最优的,尤其是在需要快速响应的情况下。将 GPIO 回调与 asyncio 集成的目标是通过某种形式的事件驱动机制来触发异步任务。

以下是一些改进和替代方法,旨在实现更高效的异步 GPIO 中断处理:


使用 Asyncio 和事件

结合 asyncio 的事件,可以通过将 GPIO 事件包装为 asyncio 兼容的事件触发来实现更好的集成。下面是一个示例:

import asyncio
import RPi.GPIO as GPIO
from SX127x.LoRa import *
from SX127x.board_config import BOARD

class LoRaRcvCont(LoRa):

    def __init__(self, verbose=False):
        super(LoRaRcvCont, self).__init__(verbose)
        self.set_mode(MODE.SLEEP)
        self.set_dio_mapping([0] * 6)
        self.dio_event = asyncio.Event()

    async def monitor_gpio(self, channel):
        """Monitor GPIO and set the event when interrupt occurs."""
        while True:
            await asyncio.sleep(0.01)
            if GPIO.input(channel):
                self.dio_event.set()  # Signal the asyncio event
                await asyncio.sleep(0.01)  # Debounce

    async def handle_dio_event(self):
        """Handle GPIO interrupts using asyncio events."""
        while True:
            await self.dio_event.wait()  # Wait for the event to be set
            self.dio_event.clear()  # Reset the event
            if self.dio_mapping[0] == 0:
                await self.on_rx_done()
            elif self.dio_mapping[0] == 1:
                await self.on_tx_done()
            elif self.dio_mapping[0] == 2:
                await self.on_cad_done()

    async def on_rx_done(self):
        print("LoRa RxDone")
        payload = self.read_payload(nocheck=True)
        if payload:
            print(f"Payload received: {payload}")

    async def start(self, queue_1):
        self.reset_ptr_rx()
        self.set_mode(MODE.RXCONT)
        while True:
            await asyncio.sleep(1)
            print("LoRa running...")

async def main():
    BOARD.setup()
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(BOARD.DIO0, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

    lora = LoRaRcvCont(verbose=True)
    asyncio.create_task(lora.monitor_gpio(BOARD.DIO0))  # Start GPIO monitoring
    asyncio.create_task(lora.handle_dio_event())  # Start handling GPIO events
    await lora.start(asyncio.Queue())

    BOARD.teardown()

asyncio.run(main())

主要改进

  1. asyncio.Event 用于中断信号:
    使用 asyncio.Event 来表示 GPIO 中断信号,而不是直接使用轮询来处理输入状态。

  2. 去掉阻塞轮询:
    monitor_gpio 中,GPIO 的轮询以较低的频率进行,实际中断处理由事件触发处理程序完成,减少资源占用。

  3. 分离逻辑:
    将 GPIO 中断监控与实际处理分离到不同的异步任务中,这样逻辑更清晰,也便于调试和扩展。


其他方法

  • aiomultiprocess/asyncio.run_in_executor
    如果 GPIO 库提供了线程回调支持(如 add_event_detect),可以通过 asyncio.run_in_executor 将同步回调转换为异步代码:
    python loop = asyncio.get_event_loop() GPIO.add_event_detect(channel, GPIO.RISING, callback=lambda _: loop.call_soon_threadsafe(event.set))

  • 专用异步 GPIO 库:
    如果需要更好支持异步的库,可以尝试使用专门支持 asyncio 的 GPIO 库,如 aiogpio

希望这些建议对你有所帮助!如果有更多问题,请随时提问。

2025-01-04