我尝试使用txredis(redis的非阻塞扭曲api)作为持久性消息队列,但尝试将其与我正在研究的一个草率项目一起设置,但未成功。我发现,尽管客户端没有阻塞,但它变得比原先要慢得多,因为反应堆循环中原本应该成为一个事件的事件被分解为数千个步骤。
因此,我改为尝试使用redis- py(常规阻止扭曲的api)并将调用包装在延迟线程中。它很好用,但是我想在调用redis时执行一次内部延迟,因为我想建立连接池以试图进一步加快速度。
以下是我对从扭曲文档中获取的一些示例代码的解释,这些示例代码用于递延线程以说明我的用例:
#!/usr/bin/env python from twisted.internet import reactor,threads from twisted.internet.task import LoopingCall import time def main_loop(): print 'doing stuff in main loop.. do not block me!' def aBlockingRedisCall(): print 'doing lookup... this may take a while' time.sleep(10) return 'results from redis' def result(res): print res def main(): lc = LoopingCall(main_loop) lc.start(2) d = threads.deferToThread(aBlockingRedisCall) d.addCallback(result) reactor.run() if __name__=='__main__': main()
这是我对连接池的更改,该更改使延迟线程中的代码受阻:
#!/usr/bin/env python from twisted.internet import reactor,defer from twisted.internet.task import LoopingCall import time def main_loop(): print 'doing stuff in main loop.. do not block me!' def aBlockingRedisCall(x): if x<5: #all connections are busy, try later print '%s is less than 5, get a redis client later' % x x+=1 d = defer.Deferred() d.addCallback(aBlockingRedisCall) reactor.callLater(1.0,d.callback,x) return d else: print 'got a redis client; doing lookup.. this may take a while' time.sleep(10) # this is now blocking.. any ideas? d = defer.Deferred() d.addCallback(gotFinalResult) d.callback(x) return d def gotFinalResult(x): return 'final result is %s' % x def result(res): print res def aBlockingMethod(): print 'going to sleep...' time.sleep(10) print 'woke up' def main(): lc = LoopingCall(main_loop) lc.start(2) d = defer.Deferred() d.addCallback(aBlockingRedisCall) d.addCallback(result) reactor.callInThread(d.callback, 1) reactor.run() if __name__=='__main__': main()
所以我的问题是,有人知道我的更改为什么会导致延迟的线程被阻塞和/或有人可以提出更好的解决方案吗?
好吧,正如扭曲的文档所说:
延迟不会使代码神奇地无法阻止
每当使用阻塞代码(例如)时sleep,都必须将其推迟到新线程。
sleep
#!/usr/bin/env python from twisted.internet import reactor,defer, threads from twisted.internet.task import LoopingCall import time def main_loop(): print 'doing stuff in main loop.. do not block me!' def aBlockingRedisCall(x): if x<5: #all connections are busy, try later print '%s is less than 5, get a redis client later' % x x+=1 d = defer.Deferred() d.addCallback(aBlockingRedisCall) reactor.callLater(1.0,d.callback,x) return d else: print 'got a redis client; doing lookup.. this may take a while' def getstuff( x ): time.sleep(3) return "stuff is %s" % x # getstuff is blocking, so you need to push it to a new thread d = threads.deferToThread(getstuff, x) d.addCallback(gotFinalResult) return d def gotFinalResult(x): return 'final result is %s' % x def result(res): print res def aBlockingMethod(): print 'going to sleep...' time.sleep(10) print 'woke up' def main(): lc = LoopingCall(main_loop) lc.start(2) d = defer.Deferred() d.addCallback(aBlockingRedisCall) d.addCallback(result) reactor.callInThread(d.callback, 1) reactor.run() if __name__=='__main__': main()
如果redis api不太复杂,则可以使用twisted.web重写它,而不是仅在很多线程中调用阻塞的api,这样会更自然。