一尘不染

与Flask服务器同时运行while循环

flask

我正在使用python更新一些LED。我一直在这样做:

from LEDs import *
myLEDs = LEDs()
done = False
while not done:
  myLEDs.iterate()

我想用Flask充当一些我可以在我的浏览器中运行的ReactJS前端(以更改当前模式等)与Python中的LED控制代码之间的桥梁。

我的Flask工作正常,可以处理HTTP请求,等等。我想知道如何设置myLEDs.iterate()我的flask应用程序同时连续运行(或快速运行),同时仍然能够相互通信,就像这样:

myLEDs = LEDs()

@app.route('/changePattern',methods=['POST'])
def changePattern():
  n = request.json['num']
  myLEDs.setPattern(n)
  return jsonify(**locals())

if __name__ == '__main__':
  app.debug = True
  myLEDs.setToFrequentlyIterateAndStillTalkToFlask()
  app.run()

我遇到了celery,这似乎可以解决问题,但对于我的问题有多简单,似乎也有些过头了。

使用Flask overkill只是为了让UI管理我的python后端代码吗?是否有比Celery更简单的库可用于在后台运行某些内容?


阅读 1039

收藏
2020-04-06

共1个答案

一尘不染

使用多进程在Flask HTTP请求的不同进程中运行循环:

import time
from flask import Flask, jsonify
from multiprocessing import Process, Value


app = Flask(__name__)


tasks = [
   {
      'id': 1,
      'title': u'Buy groceries',
      'description': u'Milk, Cheese, Pizza, Fruit, Tylenol', 
      'done': False
   },
   {
      'id': 2,
      'title': u'Learn Python',
      'description': u'Need to find a good Python tutorial on the web', 
      'done': False
   }
]


@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def get_tasks():
   return jsonify({'tasks': tasks})


def record_loop(loop_on):
   while True:
      if loop_on.value == True:
         print("loop running")
      time.sleep(1)


if __name__ == "__main__":
   recording_on = Value('b', True)
   p = Process(target=record_loop, args=(recording_on,))
   p.start()  
   app.run(debug=True, use_reloader=False)
   p.join()

任务部分来自这里,来自我的多处理代码。
注意“ use_reloader = False”部分。这是避免两次运行循环所必需的。由于这个原因,请看这里

可以通过使用以下命令启动服务器来测试功能

python <your_name_for_the example>.py

并打电话

curl -i http://localhost:5000/todo/api/v1.0/tasks
2020-04-06