我已经了解 asyncio 好几天了,并且在 raspbaerry pi 板代码中为 lora sx127x 模块实现了它。
我想知道如何在我的 asyncio 运行(主)循环中实现添加 gpio 事件回调。我知道 gpio 事件在后台的不同线程中运行,而不是在 asyncio 循环中。我希望能够让“on_rx_done”在 asyncio 代码中工作。
任何文档建议或提示都将非常有益。我已经从 python 文档中准备好了 asyncio.event、asyncio.loop、asyncio.future,但无法决定哪一个在这里有效。如果可能的话,我想从这里使用高级 asyncio api
这是我的基本参考代码,它可以同步工作。我想将其转换为与 asyncio 模块兼容。
我希望获得“async _dio0()”回调,该回调调用“async on_rx_done()”来填充queue_1中的“payload”,然后我开始在“make_json_string”函数中处理“payload”等等。
以防需要帮助理解上下文。这是我当前代码的简化版本。
import asyncio from SX127x.LoRa import * from SX127x.LoRaArgumentParser import LoRaArgumentParser from SX127x.board_config import BOARD from contextvars import ContextVar class LoRaRcvCont(LoRa): server_status = False def __init__(self, verbose=False): super(LoRaRcvCont, self).__init__(verbose) # here bellow add event function is called # GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback) # equivalant GPIO.add_event_detect(gpio_4, GPIO.RISING, callback=_dio0) self.set_mode(MODE.SLEEP) self.set_dio_mapping([0] * 6) async def _dio0(self, channel):#override callback function as async function # DIO0 00: RxDone # DIO0 01: TxDone # DIO0 10: CadDone print('_dio') if self.dio_mapping[0] == 0: await self.on_rx_done() elif self.dio_mapping[0] == 1: await self.on_tx_done() elif self.dio_mapping[0] == 2: await self.on_cad_done() else: raise RuntimeError("unknown dio0mapping!") async def on_rx_done(self):#override function as async function print("\nLoRa RxDone") await asyncio.sleep(.01) if self.dio_mapping[0] == 0: print("LoRa RxDone") self.clear_irq_flags(RxDone=1) payload = self.read_payload(nocheck=True) # add task in queue_1 if payload not empty if len(payload) > 0: temp_queue_1 = queue_1_var.get() try: print('queue_1 put task, start') await temp_queue_1.put(payload) except: # pylint: disable=bare-except print('ERROR in put queue_1.') finally: print('queue_1 put task, done') # return payload async def make_json_string(self, queue_1, queue_2): while True: # print("make_json_string") # get a unit of work # print("queue_1 get = ") payload = await queue_1.get() # Notify the queue that the "work item" has been processed. queue_1.task_done() async def start(self, queue_1): self.reset_ptr_rx() self.set_mode(MODE.RXCONT) count = 0 #payload = [205, 171, 1, 82, 77, 49, 48, 48, 48, 48] # asyncio.get_event_loop() while True: await asyncio.sleep(3) # await queue_1.put(payload) count = count + 1 print("loop" + str(count)) async def main(): BOARD.setup() # create the shared queue queue_1 = asyncio.Queue() queue_1_var.set(queue_1) # Create an Event object. event = asyncio.Event() lora = lora_config_setup() try: print("asyncio loop starts ...") await asyncio.gather( asyncio.create_task(lora.start(queue_1)), #asyncio.Event.set(lora._dio0(None)), asyncio.create_task(lora.make_json_string(queue_1), ) # ) print("main done") except KeyboardInterrupt: sys.stdout.flush() print("") sys.stderr.write("KeyboardInterrupt\n") finally: sys.stdout.flush() print("") lora.set_mode(MODE.SLEEP) debug_print(lora) BOARD.teardown() ######################## # code starts from here ######################### queue_1_var = ContextVar('queue_1') asyncio.run(main()) print("code finished")
2023年3月23日更新:
我正在对 GPIO 输入方法使用异步轮询,直到找到 GPIO 事件回调的适当解决方案。
... class LoRaRcvCont(LoRa): server_status = False def __init__(self, verbose=False): super(LoRaRcvCont, self).__init__(verbose) self.set_mode(MODE.SLEEP) self.set_dio_mapping([0] * 6) async def async_dio0(self, channel): # DIO0 00: RxDone # DIO0 01: TxDone # DIO0 10: CadDone print('in _dio') while True: await asyncio.sleep(.1) if GPIO.input(channel): #irq pin high #self.get_dio_mapping() #get dio mapping print('got interrupt _dio') print(f"rx_done: {self.dio_mapping[0]}") if self.dio_mapping[0] == 0: await self.on_rx_done() elif self.dio_mapping[0] == 1: await self.on_tx_done() elif self.dio_mapping[0] == 2: await self.on_cad_done() else: raise RuntimeError("unknown dio0mapping!") else: pass ... #main asyncio coroutine here all code is executed async def main(): BOARD.setup() # create the shared queue queue_1 = asyncio.Queue() queue_1_var.set(queue_1) # Create an Event object. event = asyncio.Event() lora = lora_config_setup() # http_server_check() # was using request module try: print("asyncio loop starts ...") await asyncio.gather( asyncio.create_task(lora.start(queue_1)), # asyncio.create_task(lora.on_rx_done()), # asyncio.create_task(lora.on_tx_done), asyncio.create_task(lora.async_dio0(BOARD.DIO0)), asyncio.create_task(lora.make_json_string(queue_1)), ) print("main done") except KeyboardInterrupt: sys.stdout.flush() print("") sys.stderr.write("KeyboardInterrupt\n") finally: sys.stdout.flush() print("") lora.set_mode(MODE.SLEEP) debug_print(lora) BOARD.teardown() queue_1_var = ContextVar('queue_1') asyncio.run(main()) print("code finished")
在你的代码中,通过异步轮询来模拟 GPIO 中断处理是一个有效的起点,但它可能并不是最优的,尤其是在需要快速响应的情况下。将 GPIO 回调与 asyncio 集成的目标是通过某种形式的事件驱动机制来触发异步任务。
以下是一些改进和替代方法,旨在实现更高效的异步 GPIO 中断处理:
结合 asyncio 的事件,可以通过将 GPIO 事件包装为 asyncio 兼容的事件触发来实现更好的集成。下面是一个示例:
import asyncio import RPi.GPIO as GPIO from SX127x.LoRa import * from SX127x.board_config import BOARD class LoRaRcvCont(LoRa): def __init__(self, verbose=False): super(LoRaRcvCont, self).__init__(verbose) self.set_mode(MODE.SLEEP) self.set_dio_mapping([0] * 6) self.dio_event = asyncio.Event() async def monitor_gpio(self, channel): """Monitor GPIO and set the event when interrupt occurs.""" while True: await asyncio.sleep(0.01) if GPIO.input(channel): self.dio_event.set() # Signal the asyncio event await asyncio.sleep(0.01) # Debounce async def handle_dio_event(self): """Handle GPIO interrupts using asyncio events.""" while True: await self.dio_event.wait() # Wait for the event to be set self.dio_event.clear() # Reset the event if self.dio_mapping[0] == 0: await self.on_rx_done() elif self.dio_mapping[0] == 1: await self.on_tx_done() elif self.dio_mapping[0] == 2: await self.on_cad_done() async def on_rx_done(self): print("LoRa RxDone") payload = self.read_payload(nocheck=True) if payload: print(f"Payload received: {payload}") async def start(self, queue_1): self.reset_ptr_rx() self.set_mode(MODE.RXCONT) while True: await asyncio.sleep(1) print("LoRa running...") async def main(): BOARD.setup() GPIO.setmode(GPIO.BCM) GPIO.setup(BOARD.DIO0, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) lora = LoRaRcvCont(verbose=True) asyncio.create_task(lora.monitor_gpio(BOARD.DIO0)) # Start GPIO monitoring asyncio.create_task(lora.handle_dio_event()) # Start handling GPIO events await lora.start(asyncio.Queue()) BOARD.teardown() asyncio.run(main())
asyncio.Event 用于中断信号: 使用 asyncio.Event 来表示 GPIO 中断信号,而不是直接使用轮询来处理输入状态。
asyncio.Event
去掉阻塞轮询: 在 monitor_gpio 中,GPIO 的轮询以较低的频率进行,实际中断处理由事件触发处理程序完成,减少资源占用。
monitor_gpio
分离逻辑: 将 GPIO 中断监控与实际处理分离到不同的异步任务中,这样逻辑更清晰,也便于调试和扩展。
aiomultiprocess/asyncio.run_in_executor 如果 GPIO 库提供了线程回调支持(如 add_event_detect),可以通过 asyncio.run_in_executor 将同步回调转换为异步代码: python loop = asyncio.get_event_loop() GPIO.add_event_detect(channel, GPIO.RISING, callback=lambda _: loop.call_soon_threadsafe(event.set))
aiomultiprocess
asyncio.run_in_executor
add_event_detect
python loop = asyncio.get_event_loop() GPIO.add_event_detect(channel, GPIO.RISING, callback=lambda _: loop.call_soon_threadsafe(event.set))
专用异步 GPIO 库: 如果需要更好支持异步的库,可以尝试使用专门支持 asyncio 的 GPIO 库,如 aiogpio。
aiogpio
希望这些建议对你有所帮助!如果有更多问题,请随时提问。