Python zmq 模块,utils() 实例源码
我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用zmq.utils()。
def recv_json(self, flags=0, **kwargs):
"""receive a Python object as a message using json to serialize
Keyword arguments are passed on to json.loads
Parameters
----------
flags : int
Any valid recv flag.
Returns
-------
obj : Python object
The Python object that arrives as a message.
"""
from zmq.utils import jsonapi
msg = self.recv(flags)
return jsonapi.loads(msg, **kwargs)
def test_utils(self):
"""test util imports"""
import zmq.utils
from zmq.utils import strtypes
from zmq.utils import jsonapi
def shadow(cls, address):
"""Shadow an existing libzmq socket
address is the integer address of the libzmq socket
or an FFI pointer to it.
.. versionadded:: 14.1
"""
from zmq.utils.interop import cast_int_addr
address = cast_int_addr(address)
return cls(shadow=address)
#-------------------------------------------------------------------------
# Deprecated aliases
#-------------------------------------------------------------------------
def send_json(self, obj, flags=0, **kwargs):
"""send a Python object as a message using json to serialize
Keyword arguments are passed on to json.dumps
Parameters
----------
obj : Python object
The Python object to send
flags : int
Any valid send flag
"""
from zmq.utils import jsonapi
msg = jsonapi.dumps(obj, **kwargs)
return self.send(msg, flags)
def test_utils(self):
"""test util imports"""
import zmq.utils
from zmq.utils import strtypes
from zmq.utils import jsonapi
def test_utils(self):
"""test util imports"""
import zmq.utils
from zmq.utils import strtypes
from zmq.utils import jsonapi
def test_utils(self):
"""test util imports"""
import zmq.utils
from zmq.utils import strtypes
from zmq.utils import jsonapi
def test_utils(self):
"""test util imports"""
import zmq.utils
from zmq.utils import strtypes
from zmq.utils import jsonapi
def test_utils(self):
"""test util imports"""
import zmq.utils
from zmq.utils import strtypes
from zmq.utils import jsonapi
def main(argv=sys.argv):
'''Main method called by the eggsecutable.'''
utils.default_main(AFDDAgent,
description='VOLTTRON platform™ AFDD agent',
argv=argv)
def test():
from volttron.platform.agent import periodic
def TestAgent(config_path, **kwargs):
config = utils.load_config(config_path)
agent_id = config['agentid']
rtu_path = dict((key, config[key])
for key in ['campus', 'building', 'unit'])
class Agent(PublishMixin, BaseAgent):
def setup(self):
super(Agent, self).setup()
self.damper = 0
@matching.match_regex(topics.ACTUATOR_LOCK_ACQUIRE() + '(/.*)')
def on_lock_result(self, topic, headers, message, match):
_log.debug("Topic: {topic}, {headers}, Message: {message}".format(
topic=topic, headers=headers, message=message))
self.publish(topics.ACTUATOR_LOCK_RESULT() + match.group(0),
headers, jsonapi.dumps('SUCCESS'))
@matching.match_regex(topics.ACTUATOR_SET() + '(/.*/([^/]+))')
def on_new_data(self, topic, headers, message, match):
_log.debug("Topic: {topic}, {headers}, Message: {message}".format(
topic=topic, headers=headers, message=message))
if match.group(2) == 'Damper':
self.damper = int(message[0])
self.publish(topics.ACTUATOR_VALUE() + match.group(0),
headers, message[0])
@periodic(5)
def send_data(self):
data = {
'ReturnAirTemperature': 55,
'OutsideAirTemperature': 50,
'MixedAirTemperature': 45,
'Damper': self.damper
}
self.publish_ex(topics.DEVICES_VALUE(point='all', **rtu_path),
{}, ('application/json', jsonapi.dumps(data)))
Agent.__name__ = 'TestAgent'
return Agent(**kwargs)
settings.afdd2_seconds_to_steady_state = 3
settings.sync_trial_time = 10
t = threading.Thread(target=utils.default_main, args=(TestAgent, 'test'))
t.daemon = True
t.start()
time.sleep(2)
main()