Python zmq 模块,RCVHWM 实例源码
我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用zmq.RCVHWM。
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def execute_command_forwarder():
from oldspeak.console.parsers.streamer import parser
args = parser.parse_args(get_sub_parser_argv())
bootstrap_conf_with_gevent(args)
device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
device.bind_in(args.subscriber)
device.bind_out(args.publisher)
device.setsockopt_in(zmq.SUBSCRIBE, b'')
if args.subscriber_hwm:
device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm)
if args.publisher_hwm:
device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm)
print "oldspeak forwarder started"
print "date", datetime.utcnow().isoformat()
print "subscriber", (getattr(args, 'subscriber'))
print "publisher", (getattr(args, 'publisher'))
device.start()
def execute_command_streamer():
from oldspeak.console.parsers.streamer import parser
args = parser.parse_args(get_sub_parser_argv())
bootstrap_conf_with_gevent(args)
device = Device(zmq.STREAMER, zmq.PULL, zmq.PUSH)
device.bind_in(args.pull)
device.bind_out(args.push)
if args.pull_hwm:
device.setsockopt_in(zmq.RCVHWM, args.pull_hwm)
if args.push_hwm:
device.setsockopt_out(zmq.SNDHWM, args.push_hwm)
print "oldspeak streamer started"
print "date", datetime.utcnow().isoformat()
print "pull", (getattr(args, 'pull'))
print "push", (getattr(args, 'push'))
device.start()
def _setup_ipc(self):
'''
Subscribe to the pub IPC
and publish the messages
on the right transport.
'''
self.ctx = zmq.Context()
log.debug('Setting up the publisher puller')
self.sub = self.ctx.socket(zmq.PULL)
self.sub.bind(PUB_IPC_URL)
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
def _frame_worker(self):
if(getattr(self, '_frame_class', None)):
ctx = zmq.Context.instance()
skt = ctx.socket(zmq.SUB)
skt.connect("tcp://%s:27185" % self._moku._ip)
skt.setsockopt_string(zmq.SUBSCRIBE, u'')
skt.setsockopt(zmq.RCVHWM, 8)
skt.setsockopt(zmq.LINGER, 5000)
fr = self._frame_class(**self._frame_kwargs)
try:
while self._running:
if skt in zmq.select([skt], [], [], 1.0)[0]:
d = skt.recv()
fr.add_packet(d)
if fr._complete:
self._queue.put_nowait(fr)
fr = self._frame_class(**self._frame_kwargs)
finally:
skt.close()
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
.. warning::
New values only take effect for subsequent socket
bind/connects.
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception as e:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def _setup_ipc(self):
'''
Subscribe to the right topic
in the device IPC and publish to the
publisher proxy.
'''
self.ctx = zmq.Context()
# subscribe to device IPC
log.debug('Creating the dealer IPC for %s', self._name)
self.sub = self.ctx.socket(zmq.DEALER)
if six.PY2:
self.sub.setsockopt(zmq.IDENTITY, self._name)
elif six.PY3:
self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8'))
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
# subscribe to the corresponding IPC pipe
self.sub.connect(DEV_IPC_URL)
# self.sub.setsockopt(zmq.SUBSCRIBE, '')
# publish to the publisher IPC
self.pub = self.ctx.socket(zmq.PUSH)
self.pub.connect(PUB_IPC_URL)
try:
self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def start(self):
'''
Startup the zmq consumer.
'''
zmq_uri = '{protocol}://{address}:{port}'.format(
protocol=self.protocol,
address=self.address,
port=self.port
) if self.port else\
'{protocol}://{address}'.format( # noqa
protocol=self.protocol,
address=self.address
)
log.debug('ZMQ URI: %s', zmq_uri)
self.ctx = zmq.Context()
if hasattr(zmq, self.type):
skt_type = getattr(zmq, self.type)
else:
skt_type = zmq.PULL
self.sub = self.ctx.socket(skt_type)
self.sub.connect(zmq_uri)
if self.hwm is not None:
try:
self.sub.setsockopt(zmq.HWM, self.hwm)
except AttributeError:
self.sub.setsockopt(zmq.RCVHWM, self.hwm)
if self.recvtimeout is not None:
log.debug('Setting RCVTIMEO to %d', self.recvtimeout)
self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout)
if self.keepalive is not None:
log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive)
self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive)
if self.keepalive_idle is not None:
log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle)
if self.keepalive_interval is not None:
log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
def _setup_ipc(self):
'''
Setup the IPC pub and sub.
Subscript to the listener IPC
and publish to the device specific IPC.
'''
log.debug('Setting up the server IPC puller to receive from the listener')
self.ctx = zmq.Context()
# subscribe to listener
self.sub = self.ctx.socket(zmq.PULL)
self.sub.bind(LST_IPC_URL)
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
# device publishers
log.debug('Creating the router ICP on the server')
self.pub = self.ctx.socket(zmq.ROUTER)
self.pub.bind(DEV_IPC_URL)
try:
self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def __init__(self, context, location, partition_id, identity, seq_warnings=False, hwm=1000):
self.subscriber = context.zeromq.socket(zmq.SUB)
self.subscriber.connect(location)
self.subscriber.set(zmq.RCVHWM, hwm)
filter = identity + pack('>B', partition_id) if partition_id is not None else identity
self.subscriber.setsockopt(zmq.SUBSCRIBE, filter)
self.counter = 0
self.count_global = partition_id is None
self.logger = getLogger("distributed_frontera.messagebus.zeromq.Consumer(%s-%s)" % (identity, partition_id))
self.seq_warnings = seq_warnings
self.stats = context.stats
self.stat_key = "consumer-%s" % identity
self.stats[self.stat_key] = 0
def zthread_fork(ctx, func, *args, **kwargs):
"""
Create an attached thread. An attached thread gets a ctx and a PAIR
pipe back to its parent. It must monitor its pipe, and exit if the
pipe becomes unreadable. Returns pipe, or NULL if there was an error.
"""
a = ctx.socket(zmq.PAIR)
a.setsockopt(zmq.LINGER, 0)
a.setsockopt(zmq.RCVHWM, 100)
a.setsockopt(zmq.SNDHWM, 100)
a.setsockopt(zmq.SNDTIMEO, 5000)
a.setsockopt(zmq.RCVTIMEO, 5000)
b = ctx.socket(zmq.PAIR)
b.setsockopt(zmq.LINGER, 0)
b.setsockopt(zmq.RCVHWM, 100)
b.setsockopt(zmq.SNDHWM, 100)
b.setsockopt(zmq.SNDTIMEO, 5000)
a.setsockopt(zmq.RCVTIMEO, 5000)
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
thread = threading.Thread(target=func, args=((ctx, b) + args), kwargs=kwargs)
thread.daemon = False
thread.start()
return a
def _init_zmq(self):
# this is ugly but work well
import zmq
# tasks_or_queue only return the indices, need to get it from self._jobs
def wrapped_map(pID, tasks, remain_jobs):
# ====== create ZMQ socket ====== #
ctx = zmq.Context()
sk = ctx.socket(zmq.PAIR)
sk.set(zmq.SNDHWM, self._hwm)
sk.set(zmq.LINGER, -1)
sk.bind("ipc:///tmp/%d" % (self._ID + pID))
# ====== Doing the jobs ====== #
t = tasks.get()
while t is not None:
# `t` is just list of indices
t = [self._jobs[i] for i in t]
# monitor current number of remain jobs
remain_jobs.add(-len(t))
if self._batch == 1: # batch=1, NO need for list of inputs
ret = self._func(t[0])
else: # we have input is list of inputs here
ret = self._func(t)
# if a generator is return, traverse through the
# iterator and return each result
if not isinstance(ret, types.GeneratorType):
ret = (ret,)
for r in ret:
# ignore None values
if r is not None:
sk.send_pyobj(r)
# delete old data (this work, checked)
del ret
# ge tne tasks
t = tasks.get()
# ending signal
sk.send_pyobj(None)
# wait for ending message
sk.recv()
sk.close()
ctx.term()
sys.exit(0)
# ====== start the processes ====== #
self._processes = [Process(target=wrapped_map,
args=(i, self._tasks, self._remain_jobs))
for i in range(self._ncpu)]
[p.start() for p in self._processes]
# ====== pyzmq PULL socket ====== #
ctx = zmq.Context()
sockets = []
for i in range(self._ncpu):
sk = ctx.socket(zmq.PAIR)
sk.set(zmq.RCVHWM, 0) # no limit receiving
sk.connect("ipc:///tmp/%d" % (self._ID + i))
sockets.append(sk)
self._ctx = ctx
self._sockets = sockets
self._zmq_noblock = zmq.NOBLOCK
self._zmq_again = zmq.error.Again