Python zmq 模块,RCVTIMEO 实例源码
我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用zmq.RCVTIMEO。
def setUp(self):
""" Create a dummy supvisors, ZMQ context and sockets. """
from supvisors.supvisorszmq import (InternalEventPublisher,
InternalEventSubscriber)
# the dummy Supvisors is used for addresses and ports
self.supvisors = MockedSupvisors()
# create publisher and subscriber
self.publisher = InternalEventPublisher(
self.supvisors.address_mapper.local_address,
self.supvisors.options.internal_port,
self.supvisors.logger)
self.subscriber = InternalEventSubscriber(
self.supvisors.address_mapper.addresses,
self.supvisors.options.internal_port)
# socket configuration is meant to be blocking
# however, a failure would block the unit test,
# so a timeout is set for reception
self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000)
# publisher does not wait for subscriber clients to work,
# so give some time for connections
time.sleep(1)
def start(self, socket):
"""
Start the monitoring thread and socket.
:param socket: Socket to monitor.
"""
# Start a thread only if it is not already running.
if self.monitor_listening.is_set():
return
# Setup monitor socket.
monitor_socket = socket.get_monitor_socket(events=self.events)
monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
self.monitor_listening.set()
def event_listener(monitor_listening):
while monitor_listening.is_set():
try:
event = recv_monitor_message(monitor_socket)
# The socket is closed, just stop listening now.
if event["event"] == zmq.EVENT_CLOSED:
monitor_listening.clear()
self._notify_listeners(event)
# In case the receive cannot be completed before the timeout.
except zmq.Again:
# Heartbeat for listeners - we do not need an additional thread for time based listeners.
self._notify_listeners(None)
# Cleanup monitor socket.
socket.disable_monitor()
monitor_socket.close()
self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
# In case someone does not call disconnect, this will stop the thread anyway.
self.monitor_thread.daemon = True
self.monitor_thread.start()
def setUp(self):
""" Create a dummy supvisors, ZMQ context and sockets. """
from supvisors.supvisorszmq import RequestPusher, RequestPuller
# the dummy Supvisors is used for addresses and ports
self.supvisors = MockedSupvisors()
# create pusher and puller
self.pusher = RequestPusher(self.supvisors.logger)
self.puller = RequestPuller()
# socket configuration is meant to be blocking
# however, a failure would block the unit test,
# so a timeout is set for emission and reception
self.puller.socket.setsockopt(zmq.SNDTIMEO, 1000)
self.puller.socket.setsockopt(zmq.RCVTIMEO, 1000)
def setUp(self):
""" Create a dummy supvisors and a ZMQ context. """
from supvisors.supvisorszmq import EventPublisher, EventSubscriber
# the dummy Supvisors is used for addresses and ports
self.supvisors = MockedSupvisors()
# create the ZeroMQ context
# create publisher and subscriber
self.publisher = EventPublisher(
self.supvisors.options.event_port,
self.supvisors.logger)
self.subscriber = EventSubscriber(
zmq.Context.instance(),
self.supvisors.options.event_port,
self.supvisors.logger)
# WARN: this subscriber does not include a subscription
# when using a subscription, use a time sleep to give time
# to PyZMQ to handle it
# WARN: socket configuration is meant to be blocking
# however, a failure would block the unit test,
# so a timeout is set for reception
self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000)
# create test payloads
self.supvisors_payload = Payload({'state': 'running',
'version': '1.0'})
self.address_payload = Payload({'state': 'silent',
'name': 'cliche01',
'date': 1234})
self.application_payload = Payload({'state': 'starting',
'name': 'supvisors'})
self.process_payload = Payload({'state': 'running',
'process_name': 'plugin',
'application_name': 'supvisors',
'date': 1230})
self.event_payload = Payload({'state': 20,
'name': 'plugin',
'group': 'supvisors',
'now': 1230})
def setup_socket(self):
"""Sets up the ZMQ socket."""
context = zmq.Context()
# The component inheriting from BaseComponent should self.socket.connect
# with the appropriate address.
self.socket = context.socket(zmq.REQ)
# LINGER sets a timeout for socket.send.
self.socket.setsockopt(zmq.LINGER, 0)
# RCVTIME0 sets a timeout for socket.recv.
self.socket.setsockopt(zmq.RCVTIMEO, 500) # milliseconds
def connect_socket(self):
reply = None
for c in self.controllers:
self.logger.debug('Establishing socket connection to %s' % c)
tmp_sock = self.context.socket(zmq.REQ)
tmp_sock.setsockopt(zmq.RCVTIMEO, 2000)
tmp_sock.setsockopt(zmq.LINGER, 0)
tmp_sock.identity = self.identity
tmp_sock.connect(c)
# first ping the controller to see if it responds at all
msg = RPCMessage({'payload': 'ping'})
tmp_sock.send_json(msg)
try:
reply = msg_factory(tmp_sock.recv_json())
self.address = c
break
except:
traceback.print_exc()
continue
if reply:
# Now set the timeout to the actual requested
self.logger.debug("Connection OK, setting network timeout to %s milliseconds", self.timeout*1000)
self.controller = tmp_sock
self.controller.setsockopt(zmq.RCVTIMEO, self.timeout*1000)
else:
raise Exception('No controller connection')
def connect(self, server = None, port = None):
if self.connected:
self.disconnect()
self.context = zmq.Context()
self.server = (server if server else self.server)
self.port = (port if port else self.port)
# Socket to talk to server
self.transport = "tcp://{0}:{1}".format(self.server, self.port)
self.socket = self.context.socket(zmq.REQ)
try:
self.socket.connect(self.transport)
except zmq.error.ZMQError as e:
return RC_ERR("ZMQ Error: Bad server or port name: " + str(e))
self.socket.setsockopt(zmq.SNDTIMEO, 10000)
self.socket.setsockopt(zmq.RCVTIMEO, 10000)
self.connected = True
rc = self.invoke_rpc_method('ping', api_class = None)
if not rc:
self.connected = False
return rc
return RC_OK()
def __init__(self, listeners: List[MessageListener] = None,
on_finish: Callable[[int], None] = lambda return_code: None):
"""Starts an apart-core command and starts listening for zmq messages on this new thread"""
Thread.__init__(self, name='apart-core-runner')
self.ipc_address = 'ipc:///tmp/apart-gtk-{}.ipc'.format(uuid.uuid4())
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.PAIR)
self.socket.setsockopt(zmq.RCVTIMEO, 100)
self.socket.bind(self.ipc_address)
self.on_finish = on_finish
self.listeners = listeners or [] # List[MessageListener]
if LOG_MESSAGES:
self.register(MessageListener(lambda msg: print('apart-core ->\n {}'.format(str(msg)))))
# Current default is apart-core binary stored in the directory above these sources
apart_core_cmd = os.environ.get('APART_GTK_CORE_CMD') or \
os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/../apart-core')
try:
if os.geteuid() == 0 or os.environ.get('APART_PARTCLONE_CMD'):
self.process = subprocess.Popen([apart_core_cmd, self.ipc_address])
else:
self.process = subprocess.Popen(['pkexec', apart_core_cmd, self.ipc_address])
except FileNotFoundError:
if os.geteuid() == 0:
print('apart-core command not found at \'' + apart_core_cmd + '\'', file=sys.stderr)
else:
print('pkexec command not found, install polkit or run as root', file=sys.stderr)
self.zmq_context.destroy()
sys.exit(1)
self.start()
def start(self):
'''
Startup the zmq consumer.
'''
zmq_uri = '{protocol}://{address}:{port}'.format(
protocol=self.protocol,
address=self.address,
port=self.port
) if self.port else\
'{protocol}://{address}'.format( # noqa
protocol=self.protocol,
address=self.address
)
log.debug('ZMQ URI: %s', zmq_uri)
self.ctx = zmq.Context()
if hasattr(zmq, self.type):
skt_type = getattr(zmq, self.type)
else:
skt_type = zmq.PULL
self.sub = self.ctx.socket(skt_type)
self.sub.connect(zmq_uri)
if self.hwm is not None:
try:
self.sub.setsockopt(zmq.HWM, self.hwm)
except AttributeError:
self.sub.setsockopt(zmq.RCVHWM, self.hwm)
if self.recvtimeout is not None:
log.debug('Setting RCVTIMEO to %d', self.recvtimeout)
self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout)
if self.keepalive is not None:
log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive)
self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive)
if self.keepalive_idle is not None:
log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle)
if self.keepalive_interval is not None:
log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
def test_zmq_socket_uses_timeout(self, mock_zmq_context):
timeout = 100
ControlClient(host='127.0.0.1', port='10002', timeout=timeout)
mock_zmq_context.assert_has_calls(
[call().setsockopt(zmq.SNDTIMEO, timeout), call().setsockopt(zmq.RCVTIMEO, timeout)])
def test_connection(self, mock_context):
cs = ControlServer(None, connection_string='127.0.0.1:10001')
cs.start_server()
mock_context.assert_has_calls([call(), call().socket(zmq.REP),
call().socket().setsockopt(zmq.RCVTIMEO, 100),
call().socket().bind('tcp://127.0.0.1:10001')])
def test_server_can_only_be_started_once(self, mock_context):
server = ControlServer(None, connection_string='127.0.0.1:10000')
server.start_server()
server.start_server()
mock_context.assert_has_calls([call(), call().socket(zmq.REP),
call().socket().setsockopt(zmq.RCVTIMEO, 100),
call().socket().bind('tcp://127.0.0.1:10000')])
def start_server(self):
"""
Binds the server to the configured host and port and starts listening.
"""
if self._socket is None:
context = zmq.Context()
self._socket = context.socket(zmq.REP)
self._socket.setsockopt(zmq.RCVTIMEO, 100)
self._socket.bind('tcp://{0}:{1}'.format(self.host, self.port))
self.log.info('Listening on %s:%s', self.host, self.port)
def _get_zmq_req_socket(self):
context = zmq.Context()
context.setsockopt(zmq.REQ_CORRELATE, 1)
context.setsockopt(zmq.REQ_RELAXED, 1)
context.setsockopt(zmq.SNDTIMEO, self.timeout)
context.setsockopt(zmq.RCVTIMEO, self.timeout)
context.setsockopt(zmq.LINGER, 0)
return context.socket(zmq.REQ)
def _set_timeout(self, short=True, seconds=None):
if seconds is not None:
base = seconds * 1000
else:
base = 5000
if not short:
base *= 2
self._conn.setsockopt(zmq.SNDTIMEO, base) # A send should always be quick
self._conn.setsockopt(zmq.RCVTIMEO, 2 * base) # A receive might need to wait on processing
def test_recv_timeout():
# https://github.com/eventlet/eventlet/issues/282
with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _):
sub.setsockopt(zmq.RCVTIMEO, 100)
try:
with eventlet.Timeout(1, False):
sub.recv()
assert False
except zmq.ZMQError as e:
assert eventlet.is_timeout(e)
def zthread_fork(ctx, func, *args, **kwargs):
"""
Create an attached thread. An attached thread gets a ctx and a PAIR
pipe back to its parent. It must monitor its pipe, and exit if the
pipe becomes unreadable. Returns pipe, or NULL if there was an error.
"""
a = ctx.socket(zmq.PAIR)
a.setsockopt(zmq.LINGER, 0)
a.setsockopt(zmq.RCVHWM, 100)
a.setsockopt(zmq.SNDHWM, 100)
a.setsockopt(zmq.SNDTIMEO, 5000)
a.setsockopt(zmq.RCVTIMEO, 5000)
b = ctx.socket(zmq.PAIR)
b.setsockopt(zmq.LINGER, 0)
b.setsockopt(zmq.RCVHWM, 100)
b.setsockopt(zmq.SNDHWM, 100)
b.setsockopt(zmq.SNDTIMEO, 5000)
a.setsockopt(zmq.RCVTIMEO, 5000)
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
thread = threading.Thread(target=func, args=((ctx, b) + args), kwargs=kwargs)
thread.daemon = False
thread.start()
return a
def _run (self):
# socket must be created on the same thread
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
self.socket.connect(self.tr)
got_data = False
self.monitor.reset()
while self.active:
try:
with self.monitor:
line = self.socket.recv_string()
self.monitor.on_recv_msg(line)
self.last_data_recv_ts = time.time()
# signal once
if not got_data:
self.event_handler.on_async_alive()
got_data = True
# got a timeout - mark as not alive and retry
except zmq.Again:
# signal once
if got_data:
self.event_handler.on_async_dead()
got_data = False
continue
except zmq.ContextTerminated:
# outside thread signaled us to exit
assert(not self.active)
break
msg = json.loads(line)
name = msg['name']
data = msg['data']
type = msg['type']
baseline = msg.get('baseline', False)
self.raw_snapshot[name] = data
self.__dispatch(name, type, data, baseline)
# closing of socket must be from the same thread
self.socket.close(linger = 0)
def __init__(self, host, address, log_address):
object.__init__(self)
if log_address is None:
raise NotImplementedError()
# TODO remove
self.logger = get_log(log_address, name=__name__)
# TODO find proper space to define following class
class Encoder(json.JSONEncoder):
def default(self_, obj):
if obj is None:
obj = json.JSONEncoder.default(obj)
else:
if isinstance(obj, Proxy):
obj = obj.encode()
else:
obj = self.wrap_proxy(obj)
obj = obj.encode()
return obj
self.encoder = Encoder
self.context = zmq.Context()
# TODO connect tmp socket
self.logger.debug("connect tmp socket at {a}".format(a=address))
socket = self.context.socket(zmq.PAIR)
socket.connect(address)
# TODO bind rpc socket
transport = 'tcp'
port = '*'
endpoint = '{h}:{p}'.format(h=host, p=port)
address = '{t}://{e}'.format(t=transport, e=endpoint)
self.logger.debug("bind rpc socket at {a}".format(a=address))
self.socket = self.context.socket(zmq.PAIR)
# self.socket.setsockopt(zmq.RCVTIMEO, 10000)
self.socket.bind(address)
self.address = self.socket.getsockopt(zmq.LAST_ENDPOINT)
self.logger.debug("rpc socket binded at {a}".format(a=self.address))
# TODO send rpc address
self.logger.debug("send back rpc address")
message = {
'address': self.address,
}
socket.send_json(message)
self.last_obj_id = -1
self.objs = {}
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
def __init__(self, ip_addr, load_instruments=None, force=False):
"""Create a connection to the Moku:Lab unit at the given IP address
:type ip_addr: string
:param ip_addr: The address to connect to. This should be in IPv4 dotted notation.
:type load_instruments: bool or None
:param load_instruments: Leave default (*None*) unless you know what you're doing.
:type force: bool
:param force: Ignore firmware and network compatibility checks and force the instrument
to deploy. This is dangerous on many levels, leave *False* unless you know what you're doing.
"""
self._ip = ip_addr
self._seq = 0
self._instrument = None
self._known_mokus = []
self._ctx = zmq.Context.instance()
self._conn_lock = threading.RLock()
try:
self._conn = self._ctx.socket(zmq.REQ)
self._conn.setsockopt(zmq.LINGER, 5000)
self._conn.curve_publickey, self._conn.curve_secretkey = zmq.curve_keypair()
self._conn.curve_serverkey, _ = zmq.auth.load_certificate(os.path.join(data_folder, '000'))
self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))
# Getting the serial should be fairly quick; it's a simple operation. More importantly we
# don't wait to block the fall-back operation for too long
self._conn.setsockopt(zmq.SNDTIMEO, 1000)
self._conn.setsockopt(zmq.RCVTIMEO, 1000)
self.serial = self.get_serial()
self._set_timeout()
except zmq.error.Again:
if not force:
print("Connection failed, either the Moku cannot be reached or the firmware is out of date")
raise
# If we're force-connecting, try falling back to non-encrypted.
self._conn = self._ctx.socket(zmq.REQ)
self._conn.setsockopt(zmq.LINGER, 5000)
self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))
self._set_timeout()
self.serial = self.get_serial()
self.name = None
self.led = None
self.led_colours = None
# Check that pymoku is compatible with the Moku:Lab's firmware version
if not force:
build = self.get_firmware_build()
if cp.firmware_is_compatible(build) == False: # Might be None = unknown, don't print that.
raise MokuException("The connected Moku appears to be incompatible with this version of pymoku. Please run 'moku --ip={} firmware check_compat' for more information.".format(self._ip))
self.load_instruments = load_instruments if load_instruments is not None else self.get_bootmode() == 'normal'