我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xmlrpc.client.dumps()。
def test_datetime_before_1900(self): # same as before but with a date before 1900 dt = datetime.datetime(1, 2, 10, 11, 41, 23) self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) s = xmlrpclib.dumps((dt,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), datetime.datetime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), xmlrpclib.DateTime) self.assertIsNone(m)
def test_dump_big_int(self): if sys.maxsize > 2**31-1: self.assertRaises(OverflowError, xmlrpclib.dumps, (int(2**34),)) xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,)) self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,)) def dummy_write(s): pass m = xmlrpclib.Marshaller() m.dump_int(xmlrpclib.MAXINT, dummy_write) m.dump_int(xmlrpclib.MININT, dummy_write) self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write) self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)
def test_dump_bytes(self): sample = b"my dog has fleas" self.assertEqual(sample, xmlrpclib.Binary(sample)) for type_ in bytes, bytearray, xmlrpclib.Binary: value = type_(sample) s = xmlrpclib.dumps((value,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newvalue,) = result self.assertEqual(newvalue, sample) self.assertIs(type(newvalue), bytes) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newvalue,) = result self.assertEqual(newvalue, sample) self.assertIs(type(newvalue), xmlrpclib.Binary) self.assertIsNone(m)
def test_use_builtin_types(self): # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which # makes all dispatch of binary data as bytes instances, and all # dispatch of datetime argument as datetime.datetime instances. self.log = [] expected_bytes = b"my dog has fleas" expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') def foobar(*args): self.log.extend(args) handler = xmlrpc.server.SimpleXMLRPCDispatcher( allow_none=True, encoding=None, use_builtin_types=True) handler.register_function(foobar) handler._marshaled_dispatch(marshaled) self.assertEqual(len(self.log), 2) mybytes, mydate = self.log self.assertEqual(self.log, [expected_bytes, expected_date]) self.assertIs(type(mydate), datetime.datetime) self.assertIs(type(mybytes), bytes)
def prepare(self): """ Prepare the query, marshall the payload, create binary data and calculate length (size). """ self.packet = dumps(self.args, methodname=self.method, allow_none=True).encode() self.length = len(self.packet) if (self.length + 8) > self._client.MAX_REQUEST_SIZE: raise TransportException('The prepared query is larger than the maximum request size, we will not send this query!')
def __init__(self, client, method, *args, timeout=15, encode_json=True, response_id=True): """ Initiate a Scripted Query. :param client: Client instance :param method: Method name :param args: Arguments :param timeout: Timeout to wait for future result. :param encode_json: Is body json? True by default. :param response_id: Is request requiring response_id? :type client: pyplanet.core.gbx.client.GbxClient """ # Make sure we call the script stuff with TriggerModeScriptEventArray. gbx_method = 'TriggerModeScriptEventArray' gbx_args = list() # Make sure we generate a response_id. if response_id is True: self.response_id = uuid.uuid4().hex else: self.response_id = None # Encode to json if args are given, and encode_json is true (default). if encode_json and len(args) > 0: gbx_args.append(json.dumps(args)) elif not encode_json and len(args) > 0: gbx_args.extend(args) # Add the response_id to the end of the argument list. if self.response_id: gbx_args.append(str(self.response_id)) super().__init__(client, gbx_method, method, gbx_args, timeout=timeout)
def execute(self, method, *args, timeout=45.0): """ Query the dedicated server and return the results. This method is a coroutine and should be awaited on. The result you get will be a tuple with data inside (the response payload). :param method: Server method. :param args: Arguments. :param timeout: Wait for x seconds until future is returned. Default is 45 seconds. :type method: str :type args: any :return: Tuple with response data (after awaiting). :rtype: Future<tuple> """ request_bytes = dumps(args, methodname=method, allow_none=True).encode() length_bytes = len(request_bytes).to_bytes(4, byteorder='little') handler = self.get_next_handler() handler_bytes = handler.to_bytes(4, byteorder='little') # Create new future to be returned. self.handlers[handler] = future = asyncio.Future() # Send to server. self.writer.write(length_bytes + handler_bytes + request_bytes) return await asyncio.wait_for(future, timeout)
def serialize (self): self.__xmlrpc_serialized = True if self.uri [-1] != "/": self.uri += "/" self.path += "/" data = xmlrpclib.dumps (self.params, self.method, allow_none = 1).encode ("utf8") self.headers ["Content-Type"] = "text/xml; charset=utf-8" cl = len (data) self.headers ["Content-Length"] = cl self.set_content_length (cl) return data
def serialize (self): if not self.params: if self.get_method () in ("POST", "PUT", "PATCH"): self.headers ["Content-Length"] = 0 return b"" # formdata type can be string, dict, boolean if self.get_method () in ("GET", "DELETE"): if type (self.params) is dict: params = self.urlencode (to_bytes = False) else: params = self.params self.uri += "?" + params self.path += "?" + params self.params = None return b"" data = self.params header_name, content_type = self.get_header ("content-type", True) if not content_type: content_type = "application/x-www-form-urlencoded" if type (self.params) is dict: if content_type.startswith ("application/json"): data = json.dumps (self.params).encode ("utf8") content_type = "application/json; charset=utf-8" elif content_type.startswith ("application/x-www-form-urlencoded"): data = self.urlencode () content_type = "application/x-www-form-urlencoded; charset=utf-8" elif content_type.startswith ("text/namevalue"): data = self.nvpencode () content_type = "text/namevalue; charset=utf-8" self.headers ["Content-Type"] = content_type return self.to_bytes (data)
def __init__(self, conn, dumps, loads): self._conn = conn self._dumps = dumps self._loads = loads for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): obj = getattr(conn, attr) setattr(self, attr, obj)
def _xml_dumps(obj): return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
def test_dump_load(self): dump = xmlrpclib.dumps((alist,)) load = xmlrpclib.loads(dump) self.assertEqual(alist, load[0][0])
def test_dump_bare_datetime(self): # This checks that an unwrapped datetime.date object can be handled # by the marshalling code. This can't be done via test_dump_load() # since with use_datetime set to 1 the unmarshaller would create # datetime objects for the 'datetime[123]' keys as well dt = datetime.datetime(2005, 2, 10, 11, 41, 23) s = xmlrpclib.dumps((dt,)) (newdt,), m = xmlrpclib.loads(s, use_datetime=1) self.assertEqual(newdt, dt) self.assertEqual(m, None) (newdt,), m = xmlrpclib.loads(s, use_datetime=0) self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
def test_datetime_before_1900(self): # same as before but with a date before 1900 dt = datetime.datetime(1, 2, 10, 11, 41, 23) s = xmlrpclib.dumps((dt,)) (newdt,), m = xmlrpclib.loads(s, use_datetime=1) self.assertEqual(newdt, dt) self.assertEqual(m, None) (newdt,), m = xmlrpclib.loads(s, use_datetime=0) self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
def test_newstyle_class(self): class T(object): pass t = T() t.x = 100 t.y = "Hello" ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) self.assertEqual(t2, t.__dict__)
def test_dump_big_long(self): self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
def test_dump_bad_dict(self): self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
def test_dump_recursive_seq(self): l = [1,2,3] t = [3,4,5,l] l.append(t) self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
def test_dump_recursive_dict(self): d = {'1':1, '2':1} t = {'3':3, 'd':d} d['t'] = t self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
def test_dump_none(self): value = alist + [None] arg1 = (alist + [None],) strg = xmlrpclib.dumps(arg1, allow_none=True) self.assertEqual(value, xmlrpclib.loads(strg)[0][0]) self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
def test_dump_bytes(self): self.assertRaises(TypeError, xmlrpclib.dumps, (b"my dog has fleas",))
def test_dump_fault(self): f = xmlrpclib.Fault(42, 'Test Fault') s = xmlrpclib.dumps((f,)) (newf,), m = xmlrpclib.loads(s) self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) self.assertEqual(m, None) s = xmlrpclib.Marshaller().dumps(f) self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
def _marshaled_dispatch(self, data, dispatch_method = None, path = None): """Dispatches an XML-RPC method from marshalled (XML) data. XML-RPC methods are dispatched from the marshalled (XML) data using the _dispatch method and the result is returned as marshalled data. For backwards compatibility, a dispatch function can be provided as an argument (see comment in SimpleXMLRPCRequestHandler.do_POST) but overriding the existing method through subclassing is the preferred means of changing method dispatch behavior. """ try: params, method = loads(data) # generate response if dispatch_method is not None: response = dispatch_method(method, params) else: response = self._dispatch(method, params) # wrap response in a singleton tuple response = (response,) response = dumps(response, methodresponse=1, allow_none=self.allow_none, encoding=self.encoding) except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) except: # report exception back to server exc_type, exc_value, exc_tb = sys.exc_info() response = dumps( Fault(1, "%s:%s" % (exc_type, exc_value)), encoding=self.encoding, allow_none=self.allow_none, ) return response.encode(self.encoding)
def _marshaled_dispatch(self, data, dispatch_method = None, path = None): try: response = self.dispatchers[path]._marshaled_dispatch( data, dispatch_method, path) except: # report low level exception back to server # (each dispatcher should have handled their own # exceptions) exc_type, exc_value = sys.exc_info()[:2] response = dumps( Fault(1, "%s:%s" % (exc_type, exc_value)), encoding=self.encoding, allow_none=self.allow_none) response = response.encode(self.encoding) return response
def test_dump_bare_datetime(self): # This checks that an unwrapped datetime.date object can be handled # by the marshalling code. This can't be done via test_dump_load() # since with use_builtin_types set to 1 the unmarshaller would create # datetime objects for the 'datetime[123]' keys as well dt = datetime.datetime(2005, 2, 10, 11, 41, 23) self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) s = xmlrpclib.dumps((dt,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), datetime.datetime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), xmlrpclib.DateTime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_datetime=True) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), datetime.datetime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_datetime=False) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), xmlrpclib.DateTime) self.assertIsNone(m)
def test_bug_1164912 (self): d = xmlrpclib.DateTime() ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), methodresponse=True)) self.assertIsInstance(new_d.value, str) # Check that the output of dumps() is still an 8-bit string s = xmlrpclib.dumps((new_d,), methodresponse=True) self.assertIsInstance(s, str)
def test_dump_double(self): xmlrpclib.dumps((float(2 ** 34),)) xmlrpclib.dumps((float(xmlrpclib.MAXINT), float(xmlrpclib.MININT))) xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), float(xmlrpclib.MININT - 42))) def dummy_write(s): pass m = xmlrpclib.Marshaller() m.dump_double(xmlrpclib.MAXINT, dummy_write) m.dump_double(xmlrpclib.MININT, dummy_write) m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) m.dump_double(xmlrpclib.MININT - 42, dummy_write)
def xmlrpc_search(self): try: query = self.query_from_xmlrpc(self.request.body) log.debug("xmlrpc_search {0}".format(query)) hits = self.search_index_packages(query) response = dumps((hits,), methodresponse=1, encoding='utf-8') except Exception as e: log.exception("Error in xmlrpc_search") response = dumps(Fault(1, repr(e)), encoding='utf-8') return Response(response)
def send(self, obj): """Send a (picklable) object""" self._check_closed() self._check_writable() self._send_bytes(ForkingPickler.dumps(obj))
def _xml_dumps(obj): return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')