Python zmq 模块,SNDTIMEO 实例源码
我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用zmq.SNDTIMEO。
def setUp(self):
""" Create a dummy supvisors, ZMQ context and sockets. """
from supvisors.supvisorszmq import RequestPusher, RequestPuller
# the dummy Supvisors is used for addresses and ports
self.supvisors = MockedSupvisors()
# create pusher and puller
self.pusher = RequestPusher(self.supvisors.logger)
self.puller = RequestPuller()
# socket configuration is meant to be blocking
# however, a failure would block the unit test,
# so a timeout is set for emission and reception
self.puller.socket.setsockopt(zmq.SNDTIMEO, 1000)
self.puller.socket.setsockopt(zmq.RCVTIMEO, 1000)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):
self.redis_url = redis_url
self.redis_server = redis.from_url(redis_url)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes
self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.node_name = socket.gethostname()
self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
max_tries=100)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
F.write(self.address)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
F.write(str(os.getpid()))
self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
self.logger.setLevel(loglevel)
self.msg_count_in = 0
self.rpc_results = [] # buffer of results that are ready to be returned to callers
self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones...
self.files_map = {} # shows on which workers a file is available on
self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity
self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages
self.is_running = True
self.last_heartbeat = 0
self.others = {} # A dict of other Controllers running on other DQE nodes
self.start_time = time.time()
def connect(self, server = None, port = None):
if self.connected:
self.disconnect()
self.context = zmq.Context()
self.server = (server if server else self.server)
self.port = (port if port else self.port)
# Socket to talk to server
self.transport = "tcp://{0}:{1}".format(self.server, self.port)
self.socket = self.context.socket(zmq.REQ)
try:
self.socket.connect(self.transport)
except zmq.error.ZMQError as e:
return RC_ERR("ZMQ Error: Bad server or port name: " + str(e))
self.socket.setsockopt(zmq.SNDTIMEO, 10000)
self.socket.setsockopt(zmq.RCVTIMEO, 10000)
self.connected = True
rc = self.invoke_rpc_method('ping', api_class = None)
if not rc:
self.connected = False
return rc
return RC_OK()
def test_zmq_socket_uses_timeout(self, mock_zmq_context):
timeout = 100
ControlClient(host='127.0.0.1', port='10002', timeout=timeout)
mock_zmq_context.assert_has_calls(
[call().setsockopt(zmq.SNDTIMEO, timeout), call().setsockopt(zmq.RCVTIMEO, timeout)])
def _get_zmq_req_socket(self):
context = zmq.Context()
context.setsockopt(zmq.REQ_CORRELATE, 1)
context.setsockopt(zmq.REQ_RELAXED, 1)
context.setsockopt(zmq.SNDTIMEO, self.timeout)
context.setsockopt(zmq.RCVTIMEO, self.timeout)
context.setsockopt(zmq.LINGER, 0)
return context.socket(zmq.REQ)
def _set_timeout(self, short=True, seconds=None):
if seconds is not None:
base = seconds * 1000
else:
base = 5000
if not short:
base *= 2
self._conn.setsockopt(zmq.SNDTIMEO, base) # A send should always be quick
self._conn.setsockopt(zmq.RCVTIMEO, 2 * base) # A receive might need to wait on processing
def zthread_fork(ctx, func, *args, **kwargs):
"""
Create an attached thread. An attached thread gets a ctx and a PAIR
pipe back to its parent. It must monitor its pipe, and exit if the
pipe becomes unreadable. Returns pipe, or NULL if there was an error.
"""
a = ctx.socket(zmq.PAIR)
a.setsockopt(zmq.LINGER, 0)
a.setsockopt(zmq.RCVHWM, 100)
a.setsockopt(zmq.SNDHWM, 100)
a.setsockopt(zmq.SNDTIMEO, 5000)
a.setsockopt(zmq.RCVTIMEO, 5000)
b = ctx.socket(zmq.PAIR)
b.setsockopt(zmq.LINGER, 0)
b.setsockopt(zmq.RCVHWM, 100)
b.setsockopt(zmq.SNDHWM, 100)
b.setsockopt(zmq.SNDTIMEO, 5000)
a.setsockopt(zmq.RCVTIMEO, 5000)
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
thread = threading.Thread(target=func, args=((ctx, b) + args), kwargs=kwargs)
thread.daemon = False
thread.start()
return a
def __init__(self, ip_addr, load_instruments=None, force=False):
"""Create a connection to the Moku:Lab unit at the given IP address
:type ip_addr: string
:param ip_addr: The address to connect to. This should be in IPv4 dotted notation.
:type load_instruments: bool or None
:param load_instruments: Leave default (*None*) unless you know what you're doing.
:type force: bool
:param force: Ignore firmware and network compatibility checks and force the instrument
to deploy. This is dangerous on many levels, leave *False* unless you know what you're doing.
"""
self._ip = ip_addr
self._seq = 0
self._instrument = None
self._known_mokus = []
self._ctx = zmq.Context.instance()
self._conn_lock = threading.RLock()
try:
self._conn = self._ctx.socket(zmq.REQ)
self._conn.setsockopt(zmq.LINGER, 5000)
self._conn.curve_publickey, self._conn.curve_secretkey = zmq.curve_keypair()
self._conn.curve_serverkey, _ = zmq.auth.load_certificate(os.path.join(data_folder, '000'))
self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))
# Getting the serial should be fairly quick; it's a simple operation. More importantly we
# don't wait to block the fall-back operation for too long
self._conn.setsockopt(zmq.SNDTIMEO, 1000)
self._conn.setsockopt(zmq.RCVTIMEO, 1000)
self.serial = self.get_serial()
self._set_timeout()
except zmq.error.Again:
if not force:
print("Connection failed, either the Moku cannot be reached or the firmware is out of date")
raise
# If we're force-connecting, try falling back to non-encrypted.
self._conn = self._ctx.socket(zmq.REQ)
self._conn.setsockopt(zmq.LINGER, 5000)
self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))
self._set_timeout()
self.serial = self.get_serial()
self.name = None
self.led = None
self.led_colours = None
# Check that pymoku is compatible with the Moku:Lab's firmware version
if not force:
build = self.get_firmware_build()
if cp.firmware_is_compatible(build) == False: # Might be None = unknown, don't print that.
raise MokuException("The connected Moku appears to be incompatible with this version of pymoku. Please run 'moku --ip={} firmware check_compat' for more information.".format(self._ip))
self.load_instruments = load_instruments if load_instruments is not None else self.get_bootmode() == 'normal'