我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.reactor.callFromThread()。
def __init__(self, *args, **kwargs): """ Initialization method. Note that we can't call reactor methods directly here because it's not thread-safe, so we schedule the reactor/connection stuff to be run from the event loop thread when it gets the chance. """ Connection.__init__(self, *args, **kwargs) self.is_closed = True self.connector = None reactor.callFromThread(self.add_connection) self._loop.maybe_start()
def startLogging(logfilename, sysLog, prefix, nodaemon): if logfilename == '-': if not nodaemon: print 'daemons cannot log to stdout' os._exit(1) logFile = sys.stdout elif sysLog: syslog.startLogging(prefix) elif nodaemon and not logfilename: logFile = sys.stdout else: logFile = app.getLogFile(logfilename or 'twistd.log') try: import signal except ImportError: pass else: def rotateLog(signal, frame): from twisted.internet import reactor reactor.callFromThread(logFile.rotate) signal.signal(signal.SIGUSR1, rotateLog) if not sysLog: log.startLogging(logFile) sys.stdout.flush()
def stop_reactor(): """Stop the reactor and join the reactor thread until it stops. Call this function in teardown at the module or package level to reset the twisted system after your tests. You *must* do this if you mix tests using these tools and tests using twisted.trial. """ global _twisted_thread def stop_reactor(): '''Helper for calling stop from withing the thread.''' reactor.stop() reactor.callFromThread(stop_reactor) reactor_thread.join() for p in reactor.getDelayedCalls(): if p.active(): p.cancel() _twisted_thread = None
def MessageReceived(self, m): # self.Log("Messagereceived and processed ...: %s " % m.Command) if m.Command == 'verack': self.HandleVerack() elif m.Command == 'version': self.HandleVersion(m.Payload) elif m.Command == 'getaddr': self.SendPeerInfo() elif m.Command == 'getdata': self.HandleGetDataMessageReceived(m.Payload) elif m.Command == 'inv': self.HandleInvMessage(m.Payload) elif m.Command == 'block': self.HandleBlockReceived(m.Payload) elif m.Command == 'headers': reactor.callFromThread(self.HandleBlockHeadersReceived, m.Payload) # self.HandleBlockHeadersReceived(m.Payload) elif m.Command == 'addr': self.HandlePeerInfoReceived(m.Payload) else: self.Log("Command %s not implemented " % m.Command)
def blockingCallFromThread(reactor, f, *args, **kwargs): """ Improved version of twisted's blockingCallFromThread that shows the complete stacktrace when an exception is raised on the reactor's thread. If being called from the reactor thread already, just return the result of execution of the callable. """ if isInIOThread(): return f(*args, **kwargs) else: queue = Queue.Queue() def _callFromThread(): result = defer.maybeDeferred(f, *args, **kwargs) result.addBoth(queue.put) reactor.callFromThread(_callFromThread) result = queue.get() if isinstance(result, failure.Failure): other_thread_tb = traceback.extract_tb(result.getTracebackObject()) this_thread_tb = traceback.extract_stack() logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n" " Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(), ''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb))) result.raiseException() return result
def threaded_reactor(): """ Start the Twisted reactor in a separate thread, if not already done. Returns the reactor. """ global _twisted_thread if not _twisted_thread: from threading import Thread _twisted_thread = Thread(target=lambda: reactor.run(installSignalHandlers=False), name="Twisted") _twisted_thread.setDaemon(True) _twisted_thread.start() def hook_observer(): observer = log.PythonLoggingObserver() observer.start() import logging log.msg("PythonLoggingObserver hooked up", logLevel=logging.DEBUG) reactor.callFromThread(hook_observer) return reactor, _twisted_thread
def re_run(self, utt): if 'wavpath' not in utt: return k = Kaldi( get_resource('data/nnet_a_gpu_online'), self.gen_hclg_filename, get_resource('PROTO_LANGDIR')) audio = numm3.sound2np( os.path.join(self.resources['attach'].attachdir, utt['wavpath']), nchannels=1, R=8000) k.push_chunk(audio.tostring()) wds = k.get_final() k.stop() for wd in wds: del wd['phones'] utt['command_words'] = wds utt['command'] = ' '.join([X['word'] for X in wds]) reactor.callFromThread(self.db.onchange, None, {"type": "change", "id": utt["_id"], "doc": utt})
def spawnAdmin(self, user): if user.permission==0: time.sleep(5) else: proc = subprocess.Popen(['phantomjs', 'phantom/checkMessages.js', globalVals.args.domain+':'+str(globalVals.args.port), ADMIN_PASS], stdout=subprocess.PIPE ) log,_ = proc.communicate() print log f = open('phantom/messages.log','a') f.write(log) f.close() time.sleep(5) reactor.callFromThread(self.adminSendResponse, user)
def connect(self): """ Connect to MQTT broker. """ # TODO: This is currently done synchronous which could have issues in timeout situations # because it would block other subsystems. # => Check if we can do asynchronous connection establishment. self.client = mqtt.Client(client_id=self.name, clean_session=True, userdata={'foo': 'bar'}) if self.broker_username: self.client.username_pw_set(self.broker_username, self.broker_password) self.client.on_connect = lambda *args: reactor.callFromThread(self.on_connect, *args) self.client.on_message = lambda *args: reactor.callFromThread(self.on_message, *args) self.client.on_log = lambda *args: reactor.callFromThread(self.on_log, *args) # Connect with retry self.connect_loop = LoopingCall(self.connect_with_retry) self.connect_loop.start(self.retry_interval, now=True)
def subscribe(self, *args): #d = self.protocol.subscribe("foo/bar/baz", 0) log.info(u"Subscribing to topics {subscriptions}. protocol={protocol}", subscriptions=self.subscriptions, protocol=self.protocol) for topic in self.subscriptions: log.info(u"Subscribing to topic '{topic}'", topic=topic) # Topic name **must not** be unicode, so casting to string e = self.protocol.subscribe(str(topic), 0) log.info(u"Setting callback handler: {callback}", callback=self.callback) self.protocol.setPublishHandler(self.on_message_twisted) """ def cb(*args, **kwargs): log.info('publishHandler got called: name={name}, args={args}, kwargs={kwargs}', name=self.name, args=args, kwargs=kwargs) return reactor.callFromThread(self.callback, *args, **kwargs) self.protocol.setPublishHandler(cb) """
def start_packet_in_stream(self): def receive_packet_in_stream(): streaming_rpc_method = self.local_stub.ReceivePacketsIn iterator = streaming_rpc_method(empty_pb2.Empty()) try: for packet_in in iterator: reactor.callFromThread(self.packet_in_queue.put, packet_in) log.debug('enqued-packet-in', packet_in=packet_in, queue_len=len(self.packet_in_queue.pending)) except _Rendezvous, e: if e.code() == StatusCode.UNAVAILABLE: os.system("kill -15 {}".format(os.getpid())) reactor.callInThread(receive_packet_in_stream)
def start_change_event_in_stream(self): def receive_change_events(): streaming_rpc_method = self.local_stub.ReceiveChangeEvents iterator = streaming_rpc_method(empty_pb2.Empty()) try: for event in iterator: reactor.callFromThread(self.change_event_queue.put, event) log.debug('enqued-change-event', change_event=event, queue_len=len(self.change_event_queue.pending)) except _Rendezvous, e: if e.code() == StatusCode.UNAVAILABLE: os.system("kill -15 {}".format(os.getpid())) reactor.callInThread(receive_change_events)
def recv(self): """Called on the select thread when a packet arrives""" try: frame = self.rcv_frame() except RuntimeError as e: # we observed this happens sometimes right after the socket was # attached to a newly created veth interface. So we log it, but # allow to continue. log.warn('afpacket-recv-error', code=-1) return log.debug('frame-received', iface=self.iface_name, len=len(frame), hex=hexify(frame)) self.received +=1 dispatched = False for proxy in self.proxies: if proxy.filter is None or proxy.filter(frame): log.debug('frame-dispatched') dispatched = True reactor.callFromThread(self._dispatch, proxy, frame) if not dispatched: self.discarded += 1 log.debug('frame-discarded')
def receive_twisted(self, channels): """Twisted-native implementation of receive.""" deferred = defer.Deferred() def resolve_deferred(future): reactor.callFromThread(deferred.callback, future.result()) future = self.thread.twisted_schedule(RECEIVE_TWISTED, channels) future.add_done_callback(resolve_deferred) defer.returnValue((yield deferred)) # TODO: Is it optimal to read bytes from content frame, call python # decode method to convert it to string and than parse it with # msgpack? We should minimize useless work on message receive.
def work(self): batch, consumed = self.collect_batch() self.states_context.fetch() self.process_batch(batch) self.update_score.flush() self.states_context.release() # Exiting, if crawl is finished if self.strategy.finished(): logger.info("Successfully reached the crawling goal.") logger.info("Closing crawling strategy.") self.strategy.close() logger.info("Finishing.") reactor.callFromThread(reactor.stop) self.stats['last_consumed'] = consumed self.stats['last_consumption_run'] = asctime() self.stats['consumed_since_start'] += consumed
def test_callFromThread(self): """ Test callFromThread functionality: from the main thread, and from another thread. """ def cb(ign): firedByReactorThread = defer.Deferred() firedByOtherThread = defer.Deferred() def threadedFunc(): reactor.callFromThread(firedByOtherThread.callback, None) reactor.callInThread(threadedFunc) reactor.callFromThread(firedByReactorThread.callback, None) return defer.DeferredList( [firedByReactorThread, firedByOtherThread], fireOnOneErrback=True) return self._waitForThread().addCallback(cb)
def test_wakerOverflow(self): """ Try to make an overflow on the reactor waker using callFromThread. """ def cb(ign): self.failure = None waiter = threading.Event() def threadedFunction(): # Hopefully a hundred thousand queued calls is enough to # trigger the error condition for i in xrange(100000): try: reactor.callFromThread(lambda: None) except: self.failure = failure.Failure() break waiter.set() reactor.callInThread(threadedFunction) waiter.wait(120) if not waiter.isSet(): self.fail("Timed out waiting for event") if self.failure is not None: return defer.fail(self.failure) return self._waitForThread().addCallback(cb)
def _puller(self): logger.debug('Starting puller loop') while True: if not reactor.running or self._stop: logger.debug('Puller loop dying') reactor.callFromThread(self.stopped.callback, None) return channels = [self.send_channel] + list(self._pull_channels) if not channels: time.sleep(0.05) continue channel, message = self.channel_layer.receive(channels, block=False) if not channel: time.sleep(0.01) continue logger.debug('We got message on channel: %s' % (channel, )) reactor.callFromThread(self.handle_reply, channel, message)
def deferred_from_future(future): """Converts a concurrent.futures.Future object to a twisted.internet.defer.Deferred obejct. See: https://twistedmatrix.com/pipermail/twisted-python/2011-January/023296.html """ d = Deferred() def callback(future): e = future.exception() if e: if DEFERRED_RUN_IN_REACTOR_THREAD: reactor.callFromThread(d.errback, e) else: d.errback(e) else: if DEFERRED_RUN_IN_REACTOR_THREAD: reactor.callFromThread(d.callback, future.result()) else: d.callback(future.result()) future.add_done_callback(callback) return d
def execute_from_command_line(): # Limit concurrency in all thread-pools to ONE. from maasserver.utils import threads threads.install_default_pool(maxthreads=1) threads.install_database_unpool(maxthreads=1) # Disable all database connections in the reactor. from maasserver.utils import orm from twisted.internet import reactor assert not reactor.running, "The reactor has been started too early." reactor.callFromThread(orm.disable_all_database_connections) # Configure logging; Django is no longer responsible for this. Behave as # if we're always at an interactive terminal (i.e. do not wrap stdout or # stderr with log machinery). from provisioningserver import logger logger.configure(mode=logger.LoggingMode.COMMAND) # Hand over to Django. from django.core import management management.execute_from_command_line()
def run(self): req = self._req code = http.OK try: ret = self._fnc(req) except Exception as e: ret = str(e) code = http.INTERNAL_SERVER_ERROR def finishRequest(): req.setResponseCode(code) if code == http.OK: req.setHeader('Content-type', 'application/xhtml+xml') req.setHeader('charset', 'UTF-8') req.write(ret) req.finish() if self._stillAlive: reactor.callFromThread(finishRequest)
def render(self, req): self._req = req self._stillAlive = True if hasattr(req, 'notifyFinish'): req.notifyFinish().addErrback(self.connectionLost) d = autotimer.parseEPGAsync().addCallback(self.epgCallback).addErrback(self.epgErrback) def timeout(): if not d.called and self._stillAlive: reactor.callFromThread(lambda: req.write("<ignore />")) reactor.callLater(50, timeout) reactor.callLater(50, timeout) req.setResponseCode(http.OK) req.setHeader('Content-type', 'application/xhtml+xml') req.setHeader('charset', 'UTF-8') req.write("""<?xml version=\"1.0\" encoding=\"UTF-8\" ?><e2simplexmlresult>""") return server.NOT_DONE_YET
def run(self): req = self._req if self._stillAlive: req.setResponseCode(http.OK) req.setHeader('Content-type', 'application/xhtml+xml') req.setHeader('charset', 'UTF-8') reactor.callFromThread(lambda: req.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n<e2autotimersimulate api_version=\"" + str(API_VERSION) + "\">\n")) def finishRequest(): req.write('</e2autotimersimulate>') req.finish() try: autotimer.parseEPG(simulateOnly=True, callback=self.intermediateWrite) except Exception as e: def finishRequest(): req.write('<exception>'+str(e)+'</exception><|PURPOSEFULLYBROKENXML<') req.finish() if self._stillAlive: reactor.callFromThread(finishRequest)
def _cleanup(self): if self._thread: reactor.callFromThread(reactor.stop) self._thread.join(timeout=1.0) if self._thread.is_alive(): log.warning("Event loop thread could not be joined, so " "shutdown may not be clean. Please call " "Cluster.shutdown() to avoid this.") log.debug("Event loop thread was joined")
def add_timer(self, timer): self._timers.add_timer(timer) # callFromThread to schedule from the loop thread, where # the timeout task can safely be modified reactor.callFromThread(self._schedule_timeout, timer.end)
def push(self, data): """ This function is called when outgoing data should be queued for sending. Note that we can't call transport.write() directly because it is not thread-safe, so we schedule it to run from within the event loop when it gets the chance. """ reactor.callFromThread(self.connector.transport.write, data)
def runCommand(self, tc): """Called from the gui thread, pass a ThreadCommand instance to the network""" reactor.callFromThread(self._doRunCommand, tc)
def callFromThread(self, f, *args, **kw): assert callable(f), "%s is not callable" % f with NullContext(): # This NullContext is mainly for an edge case when running # TwistedIOLoop on top of a TornadoReactor. # TwistedIOLoop.add_callback uses reactor.callFromThread and # should not pick up additional StackContexts along the way. self._io_loop.add_callback(f, *args, **kw) # We don't need the waker code from the super class, Tornado uses # its own waker.
def add_callback(self, callback, *args, **kwargs): self.reactor.callFromThread( self._run_callback, functools.partial(wrap(callback), *args, **kwargs))
def _putResultInDeferred(deferred, f, args, kwargs): """Run a function and give results to a Deferred.""" from twisted.internet import reactor try: result = f(*args, **kwargs) except: f = failure.Failure() reactor.callFromThread(deferred.errback, f) else: reactor.callFromThread(deferred.callback, result)
def printResult(self): print print print "callFromThread latency:" sum = 0 for t in self.from_times: sum += t print "%f millisecond" % ((sum / self.numRounds) * 1000) print "callInThread latency:" sum = 0 for t in self.in_times: sum += t print "%f millisecond" % ((sum / self.numRounds) * 1000) print print
def tcmf_2(self, start): # runs in thread self.in_times.append(time.time() - start) reactor.callFromThread(self.tcmf_3, time.time())
def testCallFromThread(self): firedByReactorThread = defer.Deferred() firedByOtherThread = defer.Deferred() def threadedFunc(): reactor.callFromThread(firedByOtherThread.callback, None) reactor.callInThread(threadedFunc) reactor.callFromThread(firedByReactorThread.callback, None) return defer.DeferredList( [firedByReactorThread, firedByOtherThread], fireOnOneErrback=True)
def testCallMultiple(self): L = [] N = 10 d = defer.Deferred() def finished(): self.assertEquals(L, range(N)) d.callback(None) threads.callMultipleInThread([ (L.append, (i,), {}) for i in xrange(N) ] + [(reactor.callFromThread, (finished,), {})]) return d
def testWakeUp(self): # Make sure other threads can wake up the reactor d = Deferred() def wake(): time.sleep(0.1) # callFromThread will call wakeUp for us reactor.callFromThread(d.callback, None) reactor.callInThread(wake) return d
def _callFromThreadCallback(self, d): reactor.callFromThread(self._callFromThreadCallback2, d) reactor.callLater(0, self._stopCallFromThreadCallback)
def testCallFromThreadStops(self): """ Ensure that callFromThread from inside a callFromThread callback doesn't sit in an infinite loop and lets other things happen too. """ self.stopped = False d = defer.Deferred() reactor.callFromThread(self._callFromThreadCallback, d) return d
def schedule(self, *args, **kwargs): """Override in subclasses.""" reactor.callFromThread(*args, **kwargs)
def __setattr__(self, name, value): if name not in self._params: raise AttributeError(name) reactor.callFromThread(self._params.__setitem__, name, value)
def reset(self): reactor.callFromThread(self._atomic_reset)
def tearDown(self): reactor.callFromThread(reactor.stop)
def stop_reactor_thread(): """Stop twisted's reactor.""" reactor.callFromThread(reactor.stop)
def check_storage_path(self): if not os.path.exists(self.storage_path): try: os.makedirs(self.storage_path) except OSError as e: if e.errno == 13: log.err("Can't create storage directory: access denied") else: raise e reactor.callFromThread(self.stop)