Python xmlrpc.client 模块,ServerProxy() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xmlrpc.client.ServerProxy()。
def __init__(self, options, provider_options=None):
"""Initialize Gandi DNS provider."""
super(Provider, self).__init__(options)
if provider_options is None:
provider_options = {}
api_endpoint = provider_options.get('api_endpoint') or 'https://rpc.gandi.net/xmlrpc/'
self.apikey = self.options['auth_token']
self.api = xmlrpclib.ServerProxy(api_endpoint, allow_none=True)
self.default_ttl = 3600
# self.domain_id is required by test suite
self.domain_id = None
self.zone_id = None
self.domain = self.options['domain'].lower()
# Authenicate against provider,
# Make any requests required to get the domain's id for this provider,
# so it can be used in subsequent calls. Should throw an error if
# authentication fails for any reason, or if the domain does not exist.
def __init__(self, algorithm_host, algorithm, algorithm_port):
self.algorithm = algorithm
self.algorithm_port = algorithm_port
if algorithm == "rethinkdb":
self.rdb_connection = r.connect('localhost', self.algorithm_port)
logging.info("Connection with RethinkDB successful")
rethinkdb_setup(self.rdb_connection)
self.appendFunction = partial(rethinkdb_append_entry, self.rdb_connection)
elif algorithm == "paxos" or algorithm == "pso":
if algorithm_host == '\'\'' or algorithm_host == '':
algorithm_host = '127.0.0.1'
self.cluster_rpc_client = ServerProxy("http://{}:{}".format(algorithm_host, CLUSTER_APPEND_PORT),
allow_none=True)
self.appendFunction = partial(cluster_append_entry, self.cluster_rpc_client)
elif algorithm == "datastore":
self.appendFunction = partial(datastore_append_entry)
# wrapper used to execute multiple operations and register times
def configure_rethinkdb_gce(cluster, configure_daemons):
# RethinkDB requires multiple nodes to join the master
# RethinkDB requires every node to provide a set of ports
# One port is used to interact with the node running the algorithm (driver_port), the others are framework's ones
# On GCE we can always use the same set of ports, because they will be surely available (VM just spun up)
print("Trying to contact remote master configure daemon...")
# configure_daemon = rpcClient('http://{}:{}'.format(cluster[0]['address'], CONFIGURE_DAEMON_PORT))
print(configure_daemons[0].configure_rethinkdb_master(GCE_RETHINKDB_PORTS['cluster_port'],
GCE_RETHINKDB_PORTS['driver_port'],
GCE_RETHINKDB_PORTS['http_port'], cluster[0]['address']))
print("Trying to contact remote followers configure daemons...")
for (i, node) in enumerate(cluster[1:]):
print(configure_daemons[i + 1].configure_rethinkdb_follower(node['id'], cluster[0]['address'],
GCE_RETHINKDB_PORTS['cluster_port'],
GCE_RETHINKDB_PORTS['cluster_port'],
GCE_RETHINKDB_PORTS['driver_port'],
GCE_RETHINKDB_PORTS['http_port']))
# main
def test_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.add(2,3)
multicall.pow(6,8)
multicall.div(127,42)
add_result, pow_result, div_result = multicall()
self.assertEqual(add_result, 2+3)
self.assertEqual(pow_result, 6**8)
self.assertEqual(div_result, 127//42)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_non_existing_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.this_is_not_exists()
result = multicall()
# result.results contains;
# [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
# 'method "this_is_not_exists" is not supported'>}]
self.assertEqual(result.results[0]['faultCode'], 1)
self.assertEqual(result.results[0]['faultString'],
'<class \'Exception\'>:method "this_is_not_exists" '
'is not supported')
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_two(self):
p = xmlrpclib.ServerProxy(URL)
#do three requests.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been handled by a single request handler
self.assertEqual(len(self.RequestHandler.myRequests), 1)
#check that we did at least two (the third may be pending append
#due to thread scheduling)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
#test special attribute access on the serverproxy, through the __call__
#function.
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
#do some requests with close.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")() #this should trigger a new keep-alive request
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been two request handlers, each having logged at least
#two complete requests
self.assertEqual(len(self.RequestHandler.myRequests), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
def test_basic(self):
# check that flag is false by default
flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("X-exception"), expected_err)
self.assertTrue(e.headers.get("X-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_current_time(self):
# Get the current time from xmlrpc.com. This code exercises
# the minimal HTTP functionality in xmlrpclib.
self.skipTest("time.xmlrpc.com is unreliable")
server = xmlrpclib.ServerProxy("http://time.xmlrpc.com/RPC2")
try:
t0 = server.currentTime.getCurrentTime()
except socket.error as e:
self.skipTest("network error: %s" % e)
return
# Perform a minimal sanity check on the result, just to be sure
# the request means what we think it means.
t1 = xmlrpclib.DateTime()
dt0 = xmlrpclib._datetime_type(t0.value)
dt1 = xmlrpclib._datetime_type(t1.value)
if dt0 > dt1:
delta = dt0 - dt1
else:
delta = dt1 - dt0
# The difference between the system time here and the system
# time on the server should not be too big.
self.assertTrue(delta.days <= 1)
def delete_from_chunk_servers(self, file):
if file.type == NodeType.directory:
for c in list(file.children):
self.delete_from_chunk_servers(c)
else:
print('Start delete file', file.get_full_path())
for f_path, servers in file.chunks.items():
for cs in servers:
try:
cl = ServerProxy(cs)
print('Send delete', f_path, 'to', cs)
cl.delete_chunk(f_path)
except:
print('Failed to delete', f_path, 'from', cs)
# get file\directory info by given path
# path format: /my_dir/index/some.file
# response format:
# { 'status': Status.ok
# 'type': NodeType.type
# 'path': '/my_dir/index/some.file' - full path for directory
# 'size': 2014 - size in bytes
# 'chunks': {
# '/my_dir/index/some.file_0': cs-2
# }
def get_file_content(self, path):
info = self.ns.get_file_info(path)
if info['status'] != Status.ok:
return info['status'], None
chunks = info['chunks']
content = ""
data = {}
for chunk, addr in chunks.items():
cs = ServerProxy(addr)
chunk_data = cs.get_chunk(chunk)
index = int(chunk.split("_")[-1])
data[index] = chunk_data
i = 0
while i < len(data):
content += data[i]
i += 1
return Status.ok, content
def test_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.add(2,3)
multicall.pow(6,8)
multicall.div(127,42)
add_result, pow_result, div_result = multicall()
self.assertEqual(add_result, 2+3)
self.assertEqual(pow_result, 6**8)
self.assertEqual(div_result, 127//42)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_non_existing_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.this_is_not_exists()
result = multicall()
# result.results contains;
# [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
# 'method "this_is_not_exists" is not supported'>}]
self.assertEqual(result.results[0]['faultCode'], 1)
self.assertEqual(result.results[0]['faultString'],
'<class \'Exception\'>:method "this_is_not_exists" '
'is not supported')
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_two(self):
p = xmlrpclib.ServerProxy(URL)
#do three requests.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been handled by a single request handler
self.assertEqual(len(self.RequestHandler.myRequests), 1)
#check that we did at least two (the third may be pending append
#due to thread scheduling)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
#test special attribute access on the serverproxy, through the __call__
#function.
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
#do some requests with close.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")() #this should trigger a new keep-alive request
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been two request handlers, each having logged at least
#two complete requests
self.assertEqual(len(self.RequestHandler.myRequests), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
def test_basic(self):
# check that flag is false by default
flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("X-exception"), expected_err)
self.assertTrue(e.headers.get("X-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_current_time(self):
# Get the current time from xmlrpc.com. This code exercises
# the minimal HTTP functionality in xmlrpclib.
self.skipTest("time.xmlrpc.com is unreliable")
server = xmlrpclib.ServerProxy("http://time.xmlrpc.com/RPC2")
try:
t0 = server.currentTime.getCurrentTime()
except socket.error as e:
self.skipTest("network error: %s" % e)
return
# Perform a minimal sanity check on the result, just to be sure
# the request means what we think it means.
t1 = xmlrpclib.DateTime()
dt0 = xmlrpclib._datetime_type(t0.value)
dt1 = xmlrpclib._datetime_type(t1.value)
if dt0 > dt1:
delta = dt0 - dt1
else:
delta = dt1 - dt0
# The difference between the system time here and the system
# time on the server should not be too big.
self.assertTrue(delta.days <= 1)
def get_pypi_info(pkg_name):
"""get version information from pypi. If <pkg_name> is not found seach
pypi. if <pkg_name> matches search results case; use the new value of
pkg_name"""
client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi')
ver_list = client.package_releases(pkg_name)
if len(ver_list) == 0:
search_list = client.search({'name': pkg_name})
for info in search_list:
if pkg_name.lower() == info['name'].lower():
pkg_name = info['name']
break
ver_list = client.package_releases(pkg_name)
if len(ver_list) == 0:
return pkg_name, 'not found', {}
version = ver_list[0]
xml_info = client.release_data(pkg_name, version)
return pkg_name, version, xml_info
def test_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.add(2,3)
multicall.pow(6,8)
multicall.div(127,42)
add_result, pow_result, div_result = multicall()
self.assertEqual(add_result, 2+3)
self.assertEqual(pow_result, 6**8)
self.assertEqual(div_result, 127//42)
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_non_existing_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.this_is_not_exists()
result = multicall()
# result.results contains;
# [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
# 'method "this_is_not_exists" is not supported'>}]
self.assertEqual(result.results[0]['faultCode'], 1)
self.assertEqual(result.results[0]['faultString'],
'<class \'Exception\'>:method "this_is_not_exists" '
'is not supported')
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_two(self):
p = xmlrpclib.ServerProxy(URL)
#do three requests.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been handled by a single request handler
self.assertEqual(len(self.RequestHandler.myRequests), 1)
#check that we did at least two (the third may be pending append
#due to thread scheduling)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
#test special attribute access on the serverproxy, through the __call__
#function.
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
#do some requests with close.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")() #this should trigger a new keep-alive request
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been two request handlers, each having logged at least
#two complete requests
self.assertEqual(len(self.RequestHandler.myRequests), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
def test_gzip_decode_limit(self):
max_gzip_decode = 20 * 1024 * 1024
data = b'\0' * max_gzip_decode
encoded = xmlrpclib.gzip_encode(data)
decoded = xmlrpclib.gzip_decode(encoded)
self.assertEqual(len(decoded), max_gzip_decode)
data = b'\0' * (max_gzip_decode + 1)
encoded = xmlrpclib.gzip_encode(data)
with self.assertRaisesRegex(ValueError,
"max gzipped payload length exceeded"):
xmlrpclib.gzip_decode(encoded)
xmlrpclib.gzip_decode(encoded, max_decode=-1)
#Test special attributes of the ServerProxy object
def test_basic(self):
# check that flag is false by default
flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("X-exception"), expected_err)
self.assertTrue(e.headers.get("X-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def find_packages(self, name, constraint=None):
packages = []
if constraint is not None:
version_parser = VersionParser()
constraint = version_parser.parse_constraints(constraint)
with ServerProxy(self._url) as client:
versions = client.package_releases(name, True)
if constraint:
versions = constraint.select([Version.coerce(v) for v in versions])
for version in versions:
try:
packages.append(Package(name, version))
except ValueError:
continue
return packages
def search(self, query, mode=0):
results = []
search = {
'name': query
}
if mode == self.SEARCH_FULLTEXT:
search['summary'] = query
client = ServerProxy(self._url)
hits = client.search(search, 'or')
for hit in hits:
results.append({
'name': hit['name'],
'description': hit['summary'],
'version': hit['version']
})
return results
def test_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.add(2,3)
multicall.pow(6,8)
multicall.div(127,42)
add_result, pow_result, div_result = multicall()
self.assertEqual(add_result, 2+3)
self.assertEqual(pow_result, 6**8)
self.assertEqual(div_result, 127//42)
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_non_existing_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.this_is_not_exists()
result = multicall()
# result.results contains;
# [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
# 'method "this_is_not_exists" is not supported'>}]
self.assertEqual(result.results[0]['faultCode'], 1)
self.assertEqual(result.results[0]['faultString'],
'<class \'Exception\'>:method "this_is_not_exists" '
'is not supported')
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_two(self):
p = xmlrpclib.ServerProxy(URL)
#do three requests.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been handled by a single request handler
self.assertEqual(len(self.RequestHandler.myRequests), 1)
#check that we did at least two (the third may be pending append
#due to thread scheduling)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
#test special attribute access on the serverproxy, through the __call__
#function.
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
#do some requests with close.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")() #this should trigger a new keep-alive request
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been two request handlers, each having logged at least
#two complete requests
self.assertEqual(len(self.RequestHandler.myRequests), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
def test_fail_with_info(self):
# use the broken message class
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("X-exception"), expected_err)
self.assertTrue(e.headers.get("X-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def __init__(self, username, password, url, use_mod_auth_kerb=False):
if url.startswith('https://'):
self._transport = SafeCookieTransport()
elif url.startswith('http://'):
self._transport = CookieTransport()
else:
raise TCMSError("Unrecognized URL scheme")
self._transport.cookiejar = CookieJar()
# print("COOKIES:", self._transport.cookiejar._cookies)
self.server = xmlrpclib.ServerProxy(
url,
transport=self._transport,
verbose=VERBOSE,
allow_none=1
)
# Login, get a cookie into our cookie jar (login_dict):
self.server.Auth.login(username, password)
# Record the user ID in case the script wants this
# self.user_id = login_dict['id']
# print('Logged in with cookie for user %i' % self.userId)
# print("COOKIES:", self._transport.cookiejar._cookies)
def __init__(self, url):
if url.startswith('https://'):
self._transport = KerbTransport()
elif url.startswith('http://'):
raise TCMSError("Encrypted https communication required for "
"Kerberos authentication.\nURL provided: {0}".format(url))
else:
raise TCMSError("Unrecognized URL scheme: {0}".format(url))
self._transport.cookiejar = CookieJar()
# print("COOKIES:", self._transport.cookiejar._cookies)
self.server = xmlrpclib.ServerProxy(
url,
transport=self._transport,
verbose=VERBOSE,
allow_none=1
)
# Login, get a cookie into our cookie jar (login_dict):
self.server.Auth.login_krbv()
def _use_pypi_xml_rpc(self):
"""Schedule analyses of packages based on PyPI index using XML-RPC.
https://wiki.python.org/moin/PyPIXmlRpc
"""
client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi')
# get a list of package names
packages = sorted(client.list_packages())
for idx, package in enumerate(packages[self.count.min:self.count.max]):
releases = client.package_releases(package, True) # True for show_hidden arg
self.log.debug("Scheduling #%d. (number versions: %d)",
self.count.min + idx, self.nversions)
for version in releases[:self.nversions]:
self.analyses_selinon_flow(package, version)
def __init__(self, uri, transport=None, encoding=None, verbose=0,
allow_none=0):
xmlrpcclient.ServerProxy.__init__(self, uri, transport=transport,
encoding=encoding, verbose=verbose,
allow_none=allow_none)
self.transport = transport
self._session = None
self.last_login_method = None
self.last_login_params = None
self.API_version = API_VERSION_1_1
def __getattr__(self, name):
if name == 'handle':
return self._session
elif name == 'xenapi':
return _Dispatcher(self.API_version, self.xenapi_request, None)
elif name.startswith('login') or name.startswith('slave_local'):
return lambda *params: self._login(name, params)
elif name == 'logout':
return _Dispatcher(self.API_version, self.xenapi_request, "logout")
else:
return xmlrpcclient.ServerProxy.__getattr__(self, name)
def search_package(name):
"""search package.
:param str name: package name
:rtype: list
:return: package name list
"""
client = xmlrpc_client.ServerProxy(PYPI_URL)
return [pkg for pkg in client.search({'name': name})
if pkg.get('name') == name]
def __init__(self, nodes):
for n in nodes:
self.nodes_rpc[str(n["id"])] = Server("http://{}:{}".format(n["address"], str(n["rpcPort"])))
for n in nodes:
self.nodes_rpc[str(n["id"])].clean_all_qdisc()
self.nodes_rpc[str(n["id"])].create_root_qdisc()
for n in nodes:
if not self.nodes_rpc[str(n["id"])].init_qdisc():
raise Exception("[{}] Error initializing qdiscs".format(NETEM_ERROR))
# modify connection from source to target using netem_command
def create_tasks(session, max_pkgs=MAX_PKGS):
client = ServerProxy(PYPI_URL)
return [get_package_info(session, pkg_name, downloads)
for pkg_name, downloads in client.top_packages(max_pkgs)]
def create_tasks(session, max_pkgs=MAX_PKGS):
client = ServerProxy(PYPI_URL)
return [get_package_info(session, pkg_name, downloads)
for pkg_name, downloads in client.top_packages(max_pkgs)]
def get_pkg_info(pkg_name, downloads=0):
# multiple asyncio jobs can not share a client
client = ServerProxy(PYPI_URL)
try:
release = client.package_releases(pkg_name)[0]
except IndexError: # marionette-transport, ll-orasql, and similar
print(pkg_name, 'has no releases in PyPI!!')
return pkg_info(pkg_name, downloads, False, False, 'PyPI error!!', '')
troves = '\n'.join(client.release_data(pkg_name, release)['classifiers'])
py2only = py2_only_classifier in troves
py3 = py3_classifier in troves
url = client.release_data(pkg_name, release)['package_url']
return pkg_info(pkg_name, downloads, py2only, py3, release, url)
def async_main(max_pkgs=MAX_PKGS): # ~ 32 secs for 200 pkgs on my MacBookPro
loop = asyncio.get_event_loop()
client = ServerProxy(PYPI_URL)
futures = [loop.run_in_executor(None, get_pkg_info, pkg_name, downloads)
for pkg_name, downloads in client.top_packages(max_pkgs)]
return [(yield from fut) for fut in futures]
def VimRpc(address=None):
if address is None:
address = addr
if address is None:
print('ERROR No Valid ADDRESS')
return -1
_serv_add = 'http://%s:%s' % (address)
lorris = ServerProxy(_serv_add, allow_none=True)
return lorris
def test_ssl_presence(self):
try:
import ssl
except ImportError:
has_ssl = False
else:
has_ssl = True
try:
xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
except NotImplementedError:
self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
except socket.error:
self.assertTrue(has_ssl)
def make_request_and_skipIf(condition, reason):
# If we skip the test, we have to make a request because the
# the server created in setUp blocks expecting one to come in.
if not condition:
return lambda func: func
def decorator(func):
def make_request_and_skip(self):
try:
xmlrpclib.ServerProxy(URL).my_function()
except (xmlrpclib.ProtocolError, socket.error) as e:
if not is_unavailable_exception(e):
raise
raise unittest.SkipTest(reason)
return make_request_and_skip
return decorator
def test_simple1(self):
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_nonascii(self):
start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.add(start_string, end_string),
start_string + end_string)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
# [ch] The test 404 is causing lots of false alarms.