我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用xmlrpc.client.Transport()。
def __init__(self, use_datetime=0): if not below_python27(): xmlrpcclient.Transport.__init__(self, use_datetime) self._use_datetime = use_datetime self._connection = (None, None) self._extra_headers = []
def parse_response(self, response): self.response_length=int(response.getheader("content-length", 0)) return xmlrpclib.Transport.parse_response(self, response)
def send_content(self, connection, body): if self.fake_gzip: #add a lone gzip header to induce decode error remotely connection.putheader("Content-Encoding", "gzip") return xmlrpclib.Transport.send_content(self, connection, body)
def test_gzip_request(self): t = self.Transport() t.encode_threshold = None p = xmlrpclib.ServerProxy(URL, transport=t) self.assertEqual(p.pow(6,8), 6**8) a = self.RequestHandler.content_length t.encode_threshold = 0 #turn on request encoding self.assertEqual(p.pow(6,8), 6**8) b = self.RequestHandler.content_length self.assertTrue(a>b) p("close")()
def test_bad_gzip_request(self): t = self.Transport() t.encode_threshold = None t.fake_gzip = True p = xmlrpclib.ServerProxy(URL, transport=t) cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, re.compile(r"\b400\b")) with cm: p.pow(6, 8) p("close")()
def test_transport(self): t = xmlrpclib.Transport() p = xmlrpclib.ServerProxy(self.url, transport=t) self.assertEqual(p('transport'), t) # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server
def parse_response(self, response): """ Cookie handling on Python 3! """ if sys.version_info >= (3,): for header in response.msg.get_all("Set-Cookie", []): cookie = header.split(";", 1)[0] self._cookies.append(cookie) return super(CookieTransport, self).parse_response(response) # Cribbed from xmlrpclib.Transport.send_user_agent
def single_request_with_cookies(self, host, handler, request_body, verbose=0): """ NOTE: Only used on Python 2.7! This is just python 2.7's xmlrpclib.Transport.single_request, with send additions noted below to send cookies along with the request """ if sys.version_info >= (3,): return h = self.make_connection(host) if verbose: h.set_debuglevel(1) # ADDED: construct the URL and Request object for proper cookie handling request_url = "%s://%s%s" % (self.scheme, host, handler) # log.debug("request_url is %s" % request_url) cookie_request = urllib2.Request(request_url) try: self.send_request(h, handler, request_body) self.send_host(h, host) # ADDED. creates cookiejar if None. self.send_cookies(h, cookie_request) self.send_user_agent(h) self.send_content(h, request_body) response = h.getresponse(buffering=True) # ADDED: parse headers and get cookies here cookie_response = CookieResponse(response.msg) # Okay, extract the cookies from the headers self.cookiejar.extract_cookies(cookie_response, cookie_request) # log.debug("cookiejar now contains: %s" % self.cookiejar._cookies) # And write back any changes if hasattr(self.cookiejar, 'save'): try: self.cookiejar.save(self.cookiejar.filename) except Exception as e: raise # log.error("Couldn't write cookiefile %s: %s" % \ # (self.cookiejar.filename,str(e))) if response.status == 200: self.verbose = verbose return self.parse_response(response) if (response.getheader("content-length", 0)): response.read() raise xmlrpclib.ProtocolError( host + handler, response.status, response.reason, response.msg, ) except xmlrpclib.Fault: raise finally: h.close() # Override the appropriate request method