我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用xmlrpc.client.loads()。
def test_datetime_before_1900(self): # same as before but with a date before 1900 dt = datetime.datetime(1, 2, 10, 11, 41, 23) self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) s = xmlrpclib.dumps((dt,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), datetime.datetime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), xmlrpclib.DateTime) self.assertIsNone(m)
def test_dump_bytes(self): sample = b"my dog has fleas" self.assertEqual(sample, xmlrpclib.Binary(sample)) for type_ in bytes, bytearray, xmlrpclib.Binary: value = type_(sample) s = xmlrpclib.dumps((value,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newvalue,) = result self.assertEqual(newvalue, sample) self.assertIs(type(newvalue), bytes) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newvalue,) = result self.assertEqual(newvalue, sample) self.assertIs(type(newvalue), xmlrpclib.Binary) self.assertIsNone(m)
def listen(self): """ Listen to socket. """ try: while True: head = await self.reader.readexactly(8) size, handle = struct.unpack_from('<LL', head) body = await self.reader.readexactly(size) data = method = fault = None try: data, method = loads(body, use_builtin_types=True) except Fault as e: fault = e except ExpatError as e: # See #121 for this solution. handle_exception(exception=e, module_name=__name__, func_name='listen', extra_data={'body': body}) continue if data and len(data) == 1: data = data[0] self.event_loop.create_task(self.handle_payload(handle, method, data, fault)) except ConnectionResetError as e: logger.critical( 'Connection with the dedicated server has been closed, we will now close down the subprocess! {}'.format(str(e)) ) # When the connection has been reset, we will close the controller process so it can be restarted by the god # process. Exit code 10 gives the information to the god process. exit(10) except Exception as e: handle_exception(exception=e, module_name=__name__, func_name='listen') raise
def __init__(self, conn, dumps, loads): self._conn = conn self._dumps = dumps self._loads = loads for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): obj = getattr(conn, attr) setattr(self, attr, obj)
def _xml_loads(s): (obj,), method = xmlrpclib.loads(s.decode('utf8')) return obj
def test_dump_load(self): dump = xmlrpclib.dumps((alist,)) load = xmlrpclib.loads(dump) self.assertEqual(alist, load[0][0])
def test_dump_bare_datetime(self): # This checks that an unwrapped datetime.date object can be handled # by the marshalling code. This can't be done via test_dump_load() # since with use_datetime set to 1 the unmarshaller would create # datetime objects for the 'datetime[123]' keys as well dt = datetime.datetime(2005, 2, 10, 11, 41, 23) s = xmlrpclib.dumps((dt,)) (newdt,), m = xmlrpclib.loads(s, use_datetime=1) self.assertEqual(newdt, dt) self.assertEqual(m, None) (newdt,), m = xmlrpclib.loads(s, use_datetime=0) self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
def test_datetime_before_1900(self): # same as before but with a date before 1900 dt = datetime.datetime(1, 2, 10, 11, 41, 23) s = xmlrpclib.dumps((dt,)) (newdt,), m = xmlrpclib.loads(s, use_datetime=1) self.assertEqual(newdt, dt) self.assertEqual(m, None) (newdt,), m = xmlrpclib.loads(s, use_datetime=0) self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
def test_newstyle_class(self): class T(object): pass t = T() t.x = 100 t.y = "Hello" ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) self.assertEqual(t2, t.__dict__)
def test_dump_none(self): value = alist + [None] arg1 = (alist + [None],) strg = xmlrpclib.dumps(arg1, allow_none=True) self.assertEqual(value, xmlrpclib.loads(strg)[0][0]) self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
def test_dump_fault(self): f = xmlrpclib.Fault(42, 'Test Fault') s = xmlrpclib.dumps((f,)) (newf,), m = xmlrpclib.loads(s) self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) self.assertEqual(m, None) s = xmlrpclib.Marshaller().dumps(f) self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
def test_cgi_xmlrpc_response(self): data = """<?xml version='1.0'?> <methodCall> <methodName>test_method</methodName> <params> <param> <value><string>foo</string></value> </param> <param> <value><string>bar</string></value> </param> </params> </methodCall> """ with support.EnvironmentVarGuard() as env, \ captured_stdout(encoding=self.cgi.encoding) as data_out, \ support.captured_stdin() as data_in: data_in.write(data) data_in.seek(0) env['CONTENT_LENGTH'] = str(len(data)) self.cgi.handle_request() data_out.seek(0) # will respond exception, if so, our goal is achieved ;) handle = data_out.read() # start with 44th char so as not to get http header, we just # need only xml self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) # Also test the content-length returned by handle_request # Using the same test method inorder to avoid all the datapassing # boilerplate code. # Test for bug: http://bugs.python.org/issue5040 content = handle[handle.find("<?xml"):] self.assertEqual( int(re.search('Content-Length: (\d+)', handle).group(1)), len(content))
def _marshaled_dispatch(self, data, dispatch_method = None, path = None): """Dispatches an XML-RPC method from marshalled (XML) data. XML-RPC methods are dispatched from the marshalled (XML) data using the _dispatch method and the result is returned as marshalled data. For backwards compatibility, a dispatch function can be provided as an argument (see comment in SimpleXMLRPCRequestHandler.do_POST) but overriding the existing method through subclassing is the preferred means of changing method dispatch behavior. """ try: params, method = loads(data) # generate response if dispatch_method is not None: response = dispatch_method(method, params) else: response = self._dispatch(method, params) # wrap response in a singleton tuple response = (response,) response = dumps(response, methodresponse=1, allow_none=self.allow_none, encoding=self.encoding) except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) except: # report exception back to server exc_type, exc_value, exc_tb = sys.exc_info() response = dumps( Fault(1, "%s:%s" % (exc_type, exc_value)), encoding=self.encoding, allow_none=self.allow_none, ) return response.encode(self.encoding)
def test_dump_bare_datetime(self): # This checks that an unwrapped datetime.date object can be handled # by the marshalling code. This can't be done via test_dump_load() # since with use_builtin_types set to 1 the unmarshaller would create # datetime objects for the 'datetime[123]' keys as well dt = datetime.datetime(2005, 2, 10, 11, 41, 23) self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) s = xmlrpclib.dumps((dt,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), datetime.datetime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), xmlrpclib.DateTime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_datetime=True) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), datetime.datetime) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_datetime=False) (newdt,) = result self.assertEqual(newdt, dt) self.assertIs(type(newdt), xmlrpclib.DateTime) self.assertIsNone(m)
def test_bug_1164912 (self): d = xmlrpclib.DateTime() ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), methodresponse=True)) self.assertIsInstance(new_d.value, str) # Check that the output of dumps() is still an 8-bit string s = xmlrpclib.dumps((new_d,), methodresponse=True) self.assertIsInstance(s, str)
def recv(self): """Receive a (picklable) object""" self._check_closed() self._check_readable() buf = self._recv_bytes() return ForkingPickler.loads(buf.getbuffer())
def _xml_loads(s): (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj
def _marshaled_dispatch(self, data, dispatch_method = None, path = None): """Dispatches an XML-RPC method from marshalled (XML) data. XML-RPC methods are dispatched from the marshalled (XML) data using the _dispatch method and the result is returned as marshalled data. For backwards compatibility, a dispatch function can be provided as an argument (see comment in SimpleXMLRPCRequestHandler.do_POST) but overriding the existing method through subclassing is the preferred means of changing method dispatch behavior. """ try: params, method = loads(data, use_builtin_types=self.use_builtin_types) # generate response if dispatch_method is not None: response = dispatch_method(method, params) else: response = self._dispatch(method, params) # wrap response in a singleton tuple response = (response,) response = dumps(response, methodresponse=1, allow_none=self.allow_none, encoding=self.encoding) except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) except: # report exception back to server exc_type, exc_value, exc_tb = sys.exc_info() response = dumps( Fault(1, "%s:%s" % (exc_type, exc_value)), encoding=self.encoding, allow_none=self.allow_none, ) return response.encode(self.encoding)
def render_POST(self, request): request.content.seek(0, 0) request.setHeader(b"content-type", b"text/xml; charset=utf-8") try: args, functionPath = xmlrpclib.loads(request.content.read(), use_datetime=self.useDateTime) except Exception as e: f = Fault(self.FAILURE, "Can't deserialize input: %s" % (e,)) self._cbRender(f, request) else: try: function = self.lookupProcedure(functionPath) except Fault as f: self._cbRender(f, request) else: # Use this list to track whether the response has failed or not. # This will be used later on to decide if the result of the # Deferred should be written out and Request.finish called. responseFailed = [] request.notifyFinish().addErrback(responseFailed.append) if getattr(function, 'withRequest', False): d = defer.maybeDeferred(function, request, *args) else: d = defer.maybeDeferred(function, *args) d.addErrback(self._ebRender) d.addCallback(self._cbRender, request, responseFailed) return server.NOT_DONE_YET
def parseResponse(self, contents): if not self.deferred: return try: response = xmlrpclib.loads(contents, use_datetime=self.useDateTime)[0][0] except: deferred, self.deferred = self.deferred, None deferred.errback(failure.Failure()) else: deferred, self.deferred = self.deferred, None deferred.callback(response)
def handle_scripted(self, handle_nr, method, data): # Unpack first. method, raw = data # Check if we only have one response array length, mostly this is the case due to the gbx handling. # Only when we don't get any response we don't have this! if len(raw) == 1: raw = raw[0] # Show warning when using non-supported modes. try: if 'LibXmlRpc' in method: logger.warning('You are using an older gamemode script that isn\'t supported by PyPlanet (usage of LibXmlRpc_)') except: pass # Try to parse JSON, mostly the case. try: if isinstance(raw, list): payload = dict() for idx, part in enumerate(raw): try: payload.update(json.loads(part)) except: payload['raw_{}'.format(idx)] = part else: payload = json.loads(raw) except Exception as e: payload = raw # Check if payload contains a responseid, when it does, we call the scripted handler future object. if isinstance(payload, dict) and 'responseid' in payload and len(payload['responseid']) > 0: response_id = payload['responseid'] if response_id in self.script_handlers: logger.debug('GBX: Received scripted response to method: {} and responseid: {}'.format(method, response_id)) handler = self.script_handlers.pop(response_id) handler.set_result(payload) handler.done() return else: # We don't have this handler registered, throw warning in console. logger.warning('GBX: Received scripted response with responseid, but no hander was registered! Payload: {}'.format(payload)) return # If not, we should just throw it as an ordinary callback. logger.debug('GBX: Received scripted callback: {}: {}'.format(method, payload)) signal = SignalManager.get_callback('Script.{}'.format(method)) if signal: await signal.send_robust(payload)
def execute(self, method, *args): payload = dumps(args, methodname=method, allow_none=True) body = gzip.compress(payload.encode('utf8')) try: res = await self.loop.run_in_executor(None, self.__request, body) data, _ = loads(res.text, use_datetime=True) if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0: if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]: raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString']) self.retries = 0 return data[0] raise DedimaniaTransportException('Invalid response from dedimania!') except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e: raise DedimaniaTransportException(e) from e except ConnectTimeout as e: raise DedimaniaTransportException(e) from e except DedimaniaTransportException: # Try to setup new session. self.retries += 1 if self.retries > 5: raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!') self.client = requests.session() try: await self.authenticate() return await self.execute(method, *args) except Exception as e: logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e))) handle_exception(e, __name__, 'execute') raise DedimaniaTransportException('Could not retrieve data from dedimania!') except DedimaniaFault as e: if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString): try: self.retries += 1 if self.retries > 5: raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!') await self.authenticate() return await self.execute(method, *args) except: return logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e))) handle_exception(e, __name__, 'execute', extra_data={ 'dedimania_retries': self.retries, }) raise DedimaniaTransportException('Could not retrieve data from dedimania!')