Python zmq 模块,Again() 实例源码
我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用zmq.Again()。
def start(self, socket):
"""
Start the monitoring thread and socket.
:param socket: Socket to monitor.
"""
# Start a thread only if it is not already running.
if self.monitor_listening.is_set():
return
# Setup monitor socket.
monitor_socket = socket.get_monitor_socket(events=self.events)
monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
self.monitor_listening.set()
def event_listener(monitor_listening):
while monitor_listening.is_set():
try:
event = recv_monitor_message(monitor_socket)
# The socket is closed, just stop listening now.
if event["event"] == zmq.EVENT_CLOSED:
monitor_listening.clear()
self._notify_listeners(event)
# In case the receive cannot be completed before the timeout.
except zmq.Again:
# Heartbeat for listeners - we do not need an additional thread for time based listeners.
self._notify_listeners(None)
# Cleanup monitor socket.
socket.disable_monitor()
monitor_socket.close()
self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
# In case someone does not call disconnect, this will stop the thread anyway.
self.monitor_thread.daemon = True
self.monitor_thread.start()
def send(self, message, send_more=False, block=True, as_json=False):
flags = 0
if send_more:
flags = zmq.SNDMORE
if not block:
flags = flags | zmq.NOBLOCK
try:
if as_json:
self.socket.send_json(message, flags)
else:
self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
except zmq.Again as e:
if not block:
pass
else:
raise e
except zmq.ZMQError as e:
logger.error(sys.exc_info()[1])
raise e
def forward(self, data):
try:
# self.logger.debug('sending message')
self.list_communication_channel.send(data)
# self.logger.debug('ok with the message')
except zmq.NotDone:
# time.sleep(TRY_TIMEOUT)
self.logger.debug('my recipient is dead, not done')
self.list_communication_channel.close()
except zmq.Again:
self.logger.debug('my recipient is dead')
# self.list_communication_channel.close()
raise zmq.Again
except zmq.ZMQError as a:
self.logger.debug("Error in message forward " + a.strerror)
self.context.destroy()
self.context = zmq.Context()
def test_disconnection(self):
""" Test the disconnection of subscribers. """
from supvisors.utils import InternalEventHeaders
# get the local address
local_address = self.supvisors.address_mapper.local_address
# test remote disconnection
address = next(address
for address in self.supvisors.address_mapper.addresses
if address != local_address)
self.subscriber.disconnect([address])
# send a tick event from the local publisher
payload = {'date': 1000}
self.publisher.send_tick_event(payload)
# check the reception of the tick event
msg = self.receive('Tick')
self.assertTupleEqual((InternalEventHeaders.TICK,
local_address, payload), msg)
# test local disconnection
self.subscriber.disconnect([local_address])
# send a tick event from the local publisher
self.publisher.send_tick_event(payload)
# check the non-reception of the tick event
with self.assertRaises(zmq.Again):
self.subscriber.receive()
def generator_from_zmq_pull(context, host):
socket = context.socket(zmq.PULL)
# TODO: Configure socket with clean properties to avoid message overload.
if host.endswith('/'):
host = host[:-1]
print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL)
socket.bind(host)
while True:
try:
message = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
message = None
if message is None:
yield None # NOTE: We have to make the generator non blocking.
else:
task = json.loads(message)
yield task
def get_messages(self, timeout=0.1, count=1):
started = time()
sleep_time = timeout / 10.0
while count:
try:
msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
except zmq.Again:
if time() - started > timeout:
break
sleep(sleep_time)
else:
partition_seqno, global_seqno = unpack(">II", msg[2])
seqno = global_seqno if self.count_global else partition_seqno
if not self.counter:
self.counter = seqno
elif self.counter != seqno:
if self.seq_warnings:
self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
"isn't missing messages." % (self.counter, seqno))
self.counter = None
yield msg[1]
count -= 1
if self.counter:
self.counter += 1
self.stats[self.stat_key] += 1
def _get_data(self, blocking=True):
"""Get batch of data."""
# TODO complete docstring.
if not blocking:
try:
batch = self.socket.recv(flags=zmq.NOBLOCK)
except zmq.Again:
return None
else:
batch = self.socket.recv()
if batch == TERM_MSG:
raise EOCError()
if self.structure == 'array':
batch = numpy.fromstring(batch, dtype=self.dtype)
batch = numpy.reshape(batch, self.shape)
elif self.structure == 'dict':
batch = json.loads(batch)
elif self.structure == 'boolean':
batch = bool(batch)
return batch
def _receiveFromListener(self, quota) -> int:
"""
Receives messages from listener
:param quota: number of messages to receive
:return: number of received messages
"""
assert quota
i = 0
while i < quota:
try:
ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK)
if not msg:
# Router probing sends empty message on connection
continue
i += 1
if self.onlyListener and ident not in self.remotesByKeys:
self.peersWithoutRemotes.add(ident)
self._verifyAndAppend(msg, ident)
except zmq.Again:
break
if i > 0:
logger.trace('{} got {} messages through listener'.
format(self, i))
return i
def test_retry_recv(self):
pull = self.socket(zmq.PULL)
pull.rcvtimeo = self.timeout_ms
self.alarm()
self.assertRaises(zmq.Again, pull.recv)
assert self.timer_fired
def test_retry_send(self):
push = self.socket(zmq.PUSH)
push.sndtimeo = self.timeout_ms
self.alarm()
self.assertRaises(zmq.Again, push.send, b('buf'))
assert self.timer_fired
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def start_listener(self):
print('ZMQ listener started')
while True:
try:
self.s.recv(zmq.NOBLOCK) # note NOBLOCK here
except zmq.Again:
# no message to recv, do other things
time.sleep(0.05)
else:
self.on_q.put(ON_SIGNAL)
def client_behavior(settings, logger):
internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)
try:
internal_channel.generate_internal_channel_client_side()
except ZMQError as e:
logger.debug(e)
message = Message()
message.priority = ALIVE
message.source_flag = INT
message.source_id = '1'
message.target_id = '1'
message.target_addr = '192.168.1.1'
message.target_key = '{}:{}'.format(0, 19)
internal_channel.send_first_internal_channel_message(dumps(message))
msg = internal_channel.wait_int_message(dont_wait=False)
logger.debug("msg : " + msg)
external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)
external_channel.generate_external_channel_client_side()
external_channel.external_channel_subscribe()
logger.debug(loads(external_channel.wait_ext_message()).printable_message())
logger.debug("try_to_connect TEST COMPLETED")
stop = False
while not stop:
try:
logger.debug(loads(external_channel.wait_ext_message()).printable_message())
sleep(1)
except Again:
logger.debug("my master is DEAD")
stop = True
def server_behavior(settings, logger):
internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)
try:
internal_channel.generate_internal_channel_server_side()
msg = loads(internal_channel.wait_int_message(dont_wait=False))
logger.debug("msg : ")
logger.debug(msg.printable_message())
internal_channel.reply_to_int_message(OK)
except ZMQError as e:
logger.debug(e)
external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)
external_channel.generate_external_channel_server_side()
external_channel.external_channel_publish()
message = Message()
message.priority = ALIVE
message.source_flag = EXT
message.source_id = '1'
message.target_id = '1'
message.target_addr = '192.168.1.1'
message.target_key = '{}:{}'.format(0, 19)
sleep(1)
external_channel.forward(dumps(message))
logger.debug("try_to_connect TEST COMPLETED")
stop = False
while not stop:
try:
external_channel.forward(dumps(message))
sleep(1)
except zmq.Again:
stop = True
def wait_int_message(self, dont_wait=True):
if dont_wait:
# wait for internal message
try:
msg = self.list_communication_channel.recv(zmq.DONTWAIT)
return msg
except zmq.Again:
raise zmq.Again
else:
self.logger.debug('waiting for a request')
msg = self.list_communication_channel.recv()
return msg
def receive(self, event_type):
""" This method performs a checked reception on the subscriber. """
try:
self.subscriber.socket.poll(1000)
return self.subscriber.receive()
except zmq.Again:
self.fail('Failed to get {} event'.format(event_type))
def receive(self, event_type):
""" This method performs a checked reception on the puller. """
try:
return self.puller.receive()
except zmq.Again:
self.fail('Failed to get {} request'. format(event_type))
def check_reception(self, header=None, data=None):
""" The method tests that the message is received correctly
or not received at all. """
if header and data:
# check that subscriber receives the message
try:
msg = self.subscriber.receive()
except zmq.Again:
self.fail('Failed to get {} status'.format(header))
self.assertTupleEqual((header, data), msg)
else:
# check the non-reception of the Supvisors status
with self.assertRaises(zmq.Again):
self.subscriber.receive()
def _send_and_receive(self, message):
"""Sending payloads to NM and returning Response instances.
Or, if the action failed, an error will be raised during the instantiation
of the Response. Can also timeout if the socket receives no data for some
period.
Args:
message: dict of a message to send to NM
Returns:
Response instance if the request succeeded
Raises:
TimeoutError: if nothing is received for the timeout
"""
# zmq is thread unsafe: if we send a second request before
# we get back the first response, we throw an exception
# fix that -kheimerl
with self.lock:
# Send the message and poll for responses.
self.socket.send(json.dumps(message))
responses = self.socket.poll(timeout=self.socket_timeout * 1000)
if responses:
try:
raw_response_data = self.socket.recv()
return Response(raw_response_data)
except zmq.Again:
pass
# If polling fails or recv failes, we reset the socket or
# it will be left in a bad state, waiting for a response.
self.socket.close()
self.setup_socket()
self.socket.connect(self.address)
raise TimeoutError('did not receive a response')
def handle_in(self):
try:
tmp = self.socket.recv_multipart()
except zmq.Again:
return
if len(tmp) != 2:
self.logger.critical('Received a msg with len != 2, something seriously wrong. ')
return
sender, msg_buf = tmp
msg = msg_factory(msg_buf)
data = self.controllers.get(sender)
if not data:
self.logger.critical('Received a msg from %s - this is an unknown sender' % sender)
return
data['last_seen'] = time.time()
# self.logger.debug('Received from %s' % sender)
# TODO Notify Controllers that we are busy, no more messages to be sent
# The above busy notification is not perfect as other messages might be on their way already
# but for long-running queries it will at least ensure other controllers
# don't try and overuse this node by filling up a queue
busy_msg = BusyMessage()
self.send_to_all(busy_msg)
try:
tmp = self.handle(msg)
except Exception, e:
tmp = ErrorMessage(msg)
tmp['payload'] = traceback.format_exc()
self.logger.exception(tmp['payload'])
if tmp:
self.send(sender, tmp)
self.send_to_all(DoneMessage()) # Send a DoneMessage to all controllers, this flags you as 'Done'. Duh
def run(self):
while True:
try:
message = self.pull.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
message = None
if message is not None:
task = json.loads(message)
self.redis.setex(
task['transaction'],
self.result_expiration,
json.dumps(task['data'])
)
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def send_raw_msg (self, msg):
tries = 0
while True:
try:
self.socket.send(msg)
break
except zmq.Again:
tries += 1
if tries > 5:
self.disconnect()
return RC_ERR("*** [RPC] - Failed to send message to server")
tries = 0
while True:
try:
response = self.socket.recv()
break
except zmq.Again:
tries += 1
if tries > 5:
self.disconnect()
return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))
return response
# processs a single response from server
def receive(self):
'''
Return the message received.
..note::
In ZMQ we are unable to get the address where we got the message from.
'''
try:
msg = self.sub.recv()
except zmq.Again as error:
log.error('Unable to receive messages: %s', error, exc_info=True)
raise ListenerException(error)
log.debug('[%s] Received %s', time.time(), msg)
return msg, ''
def test_process_does_not_block(self):
mock_socket = Mock()
mock_socket.recv_unicode.side_effect = zmq.Again()
server = ControlServer(None, connection_string='127.0.0.1:10000')
server._socket = mock_socket
assertRaisesNothing(self, server.process)
mock_socket.recv_unicode.assert_has_calls([call(flags=zmq.NOBLOCK)])
def process(self, blocking=False):
"""
Each time this method is called, the socket tries to retrieve data and passes
it to the JSONRPCResponseManager, which in turn passes the RPC to the
ExposedObjectCollection.
In case no data are available, the method does nothing. This behavior is required for
Lewis where everything is running in one thread. The central loop can call process
at some point to process remote calls, so the RPC-server does not introduce its own
infinite processing loop.
If the server has not been started yet (via :meth:`start_server`), a RuntimeError
is raised.
:param blocking: If True, this function will block until it has received data or a timeout
is triggered. Default is False to preserve behavior of prior versions.
"""
if self._socket is None:
raise RuntimeError('The server has not been started yet, use start_server to do so.')
try:
request = self._socket.recv_unicode(flags=zmq.NOBLOCK if not blocking else 0)
self.log.debug('Got request %s', request)
try:
response = JSONRPCResponseManager.handle(request, self._exposed_object)
self._socket.send_unicode(response.json)
self.log.debug('Sent response %s', response.json)
except TypeError as e:
self._socket.send_json(
self._unhandled_exception_response(json.loads(request)['id'], e))
except zmq.Again:
pass
def receive_message(socket, blocking=True):
flags = 0 if blocking else zmq.NOBLOCK
try:
cmd, data = socket.recv_multipart(flags=flags)
return cmd, data
except zmq.Again:
return None, None
except zmq.ContextTerminated:
print("Context terminated ..")
return None, None
except KeyboardInterrupt:
return None, None
def _receiveFromRemotes(self, quotaPerRemote) -> int:
"""
Receives messages from remotes
:param quotaPerRemote: number of messages to receive from one remote
:return: number of received messages
"""
assert quotaPerRemote
totalReceived = 0
for ident, remote in self.remotesByKeys.items():
if not remote.socket:
continue
i = 0
sock = remote.socket
while i < quotaPerRemote:
try:
msg, = sock.recv_multipart(flags=zmq.NOBLOCK)
if not msg:
# Router probing sends empty message on connection
continue
i += 1
self._verifyAndAppend(msg, ident)
except zmq.Again:
break
if i > 0:
logger.trace('{} got {} messages through remote {}'.
format(self, i, remote))
totalReceived += i
return totalReceived
def transmit(self, msg, uid, timeout=None, serialized=False):
remote = self.remotes.get(uid)
err_str = None
if not remote:
logger.debug("Remote {} does not exist!".format(uid))
return False, err_str
socket = remote.socket
if not socket:
logger.debug('{} has uninitialised socket '
'for remote {}'.format(self, uid))
return False, err_str
try:
if not serialized:
msg = self.prepare_to_send(msg)
# socket.send(self.signedMsg(msg), flags=zmq.NOBLOCK)
socket.send(msg, flags=zmq.NOBLOCK)
logger.debug('{} transmitting message {} to {}'
.format(self, msg, uid))
if not remote.isConnected and msg not in self.healthMessages:
logger.debug('Remote {} is not connected - '
'message will not be sent immediately.'
'If this problem does not resolve itself - '
'check your firewall settings'.format(uid))
return True, err_str
except zmq.Again:
logger.debug(
'{} could not transmit message to {}'.format(self, uid))
except InvalidMessageExceedingSizeException as ex:
err_str = '{}Cannot transmit message. Error {}'.format(
CONNECTION_PREFIX, ex)
logger.error(err_str)
return False, err_str
def transmitThroughListener(self, msg, ident) -> Tuple[bool, Optional[str]]:
if isinstance(ident, str):
ident = ident.encode()
if ident not in self.peersWithoutRemotes:
logger.debug('{} not sending message {} to {}'.
format(self, msg, ident))
logger.debug("This is a temporary workaround for not being able to "
"disconnect a ROUTER's remote")
return False, None
try:
msg = self.prepare_to_send(msg)
# noinspection PyUnresolvedReferences
# self.listener.send_multipart([ident, self.signedMsg(msg)],
# flags=zmq.NOBLOCK)
logger.trace('{} transmitting {} to {} through listener socket'.
format(self, msg, ident))
self.listener.send_multipart([ident, msg], flags=zmq.NOBLOCK)
return True, None
except zmq.Again:
return False, None
except InvalidMessageExceedingSizeException as ex:
err_str = '{}Cannot transmit message. Error {}'.format(
CONNECTION_PREFIX, ex)
logger.error(err_str)
return False, err_str
except Exception as e:
err_str = '{}{} got error {} while sending through listener to {}'\
.format(CONNECTION_PREFIX, self, e, ident)
logger.error(err_str)
return False, err_str
return True, None
def receive(self, handler=None, block=True):
"""
:param handler: Reference to a specific message handler function to use for interpreting
the message to be received
:param block: Blocking receive call
:return: Map holding the data, timestamp, data and main header
"""
message = None
# Set blocking flag in receiver
self.receiver.block = block
receive_is_successful = False
if not handler:
try:
# Dynamically select handler
htype = self.receiver.header()["htype"]
except zmq.Again:
# not clear if this is needed
self.receiver.flush(receive_is_successful)
return message
except KeyboardInterrupt:
raise
except:
logger.exception('Unable to read header - skipping')
# Clear remaining sub-messages if exist
self.receiver.flush(receive_is_successful)
return message
try:
handler = receive_handlers[htype]
except:
logger.debug(sys.exc_info()[1])
logger.warning('htype - ' + htype + ' - not supported')
try:
data = handler(self.receiver)
# as an extra safety margin
if data:
receive_is_successful = True
message = Message(self.receiver.statistics, data)
except KeyboardInterrupt:
raise
except:
logger.exception('Unable to decode message - skipping')
# Clear remaining sub-messages if exist
self.receiver.flush(receive_is_successful)
return message
def _run (self):
# socket must be created on the same thread
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
self.socket.connect(self.tr)
got_data = False
self.monitor.reset()
while self.active:
try:
with self.monitor:
line = self.socket.recv_string()
self.monitor.on_recv_msg(line)
self.last_data_recv_ts = time.time()
# signal once
if not got_data:
self.event_handler.on_async_alive()
got_data = True
# got a timeout - mark as not alive and retry
except zmq.Again:
# signal once
if got_data:
self.event_handler.on_async_dead()
got_data = False
continue
except zmq.ContextTerminated:
# outside thread signaled us to exit
assert(not self.active)
break
msg = json.loads(line)
name = msg['name']
data = msg['data']
type = msg['type']
baseline = msg.get('baseline', False)
self.raw_snapshot[name] = data
self.__dispatch(name, type, data, baseline)
# closing of socket must be from the same thread
self.socket.close(linger = 0)
def recv_loop(self, configfile=None):
"""
This is the main loop receiving data and calling functions. First it calls
the read_config function if not done previously. Afterwards it connects the
ZeroMQ publisher.
The reception is non-blocking. If nothing is received, the JobMonitor sleeps
for a second. This is no problem since ZeroMQ queues the strings.
Each loop checks whether it is time to call the update function.
If the filter applies, it is analyzed for the status attribute and if it exists,
the value is checked whether a function is registered for it and finally calls it.
"""
if not self.config:
self.read_config(configfile=configfile)
if not self.context:
self.connect()
updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval)
while not self.terminate:
s = None
try:
s = self.socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
time.sleep(1)
except KeyboardInterrupt:
self.terminate = True
pass
if not self.terminate:
if datetime.datetime.now() > updatetime:
logging.debug("Calling update function")
self.update()
updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval)
if s and self._filter(s):
logging.debug("Received string: %s" % s)
m = Measurement(s)
if self.status_attr:
logging.debug("Checking status_attr: %s" % self.status_attr)
stat = m.get_attr(self.status_attr)
if stat:
for key in self.stat_funcs:
if key == stat:
logging.debug("Calling %s function" % key)
self.stat_funcs[key](m)
self.get(m)
self.disconnect()
def testSimpleZStacksMsgs(tdir, looper):
names = ['Alpha', 'Beta']
genKeys(tdir, names)
names = ['Alpha', 'Beta']
aseed = randomSeed()
bseed = randomSeed()
size = 100000
msg = json.dumps({'random': randomSeed(size).decode()}).encode()
def aHandler(m):
str_m = "{}".format(m)
print('{} printing... {}'.format(names[0], str_m[:100]))
d, _ = m
print('Message size is {}'.format(len(d['random'])))
assert len(d['random']) == size
def bHandler(m):
print(beta.msgHandler)
a = list(beta.peersWithoutRemotes)[0]
try:
beta.listener.send_multipart([a, msg],
flags=zmq.NOBLOCK)
except zmq.Again:
return False
str_m = "{}".format(m)
print('{} printing... {}'.format(names[1], str_m[:100]))
stackParams = {
"name": names[0],
"ha": genHa(),
"auto": 2,
"basedirpath": tdir
}
alpha = SimpleZStack(stackParams, aHandler, aseed, False)
stackParams = {
"name": names[1],
"ha": genHa(),
"auto": 2,
"basedirpath": tdir
}
beta = SimpleZStack(stackParams, bHandler, bseed, True)
amotor = SMotor(alpha)
looper.add(amotor)
bmotor = SMotor(beta)
looper.add(bmotor)
alpha.connect(name=beta.name, ha=beta.ha,
verKeyRaw=beta.verKeyRaw, publicKeyRaw=beta.publicKeyRaw)
looper.runFor(0.25)
alpha.send({'greetings': 'hi'}, beta.name)
looper.runFor(1)