Python反应式编程


反应式编程是一种编程范式,用于处理数据流和变化的传播。这意味着当一个组件发出数据流时,更改将通过响应式编程库传播到其他组件。变化的传播将持续到最终接收器。事件驱动和反应式编程之间的区别在于事件驱动的编程围绕事件而反应式编程围绕数据。

ReactiveX或RX用于反应式编程

ReactiveX或Raective Extension是最着名的反应式编程实现。ReactiveX的工作取决于以下两个类 -

可观察的类

此类是数据流或事件的来源,它打包传入的数据,以便数据可以从一个线程传递到另一个线程。在某些观察者订阅数据之前,它不会提供数据。

观察者

此类使用 observable 发出的数据流。可以有多个具有可观察性的观察者,每个观察者将接收发射的每个数据项。观察者可以通过订阅观察者来接收三种类型的事件

  • on_next()事件 - 它意味着数据流中有一个元素。

  • on_completed()事件 - 它意味着排放结束,不再有物品到来。

  • on_error()事件 - 它还意味着发射结束,但是在 observable 抛出错误的情况下。

RxPY - 用于反应式编程的Python模块

RxPY是一个Python模块,可用于反应式编程。我们需要确保安装该模块。以下命令可用于安装RxPY模块

pip install RxPY

以下是一个Python脚本,它使用 RxPY 模块及其 ObservableObserve 类 进行反应式编程。基本上有两个类

  • get_strings() - 用于从观察者获取字符串。

  • PrintObserver() - 用于从观察者打印字符串。 它使用观察者类的所有三个事件。它还使用了subscribe()类。

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

输出

Received Ram
Received Mohan
Received Shyam
Finished

用于反应式编程的PyFunctional库

PyFunctional 是另一个可用于反应式编程的Python库。它使我们能够使用Python编程语言创建功能程序。它很有用,因为它允许我们使用链式函数运算符创建数据管道。

RxPY和PyFunctional之间的差异

这两个库都用于反应式编程并以类似的方式处理流,但它们之间的主要区别取决于数据的处理。 RxPY 处理系统中的数据和事件,而 PyFunctional 则专注于使用函数式编程范例转换数据。

安装PyFunctional模块

我们需要在使用之前安装此模块。它可以在pip命令的帮助下安装如下

pip install pyfunctional

下面的示例使用 PyFunctional 模块及其 seq 类,它们充当我们可以迭代和操作的流对象。在这个程序中,它使用将每个值加倍的lamda函数映射序列,然后过滤x大于4的值,最后将序列缩减为所有剩余值的总和。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

输出

Result: 6