Python operator 模块,methodcaller() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用operator.methodcaller()。
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def test_responder_resource_child(self):
"""
When a GET request is made to the ACME challenge path, and the
responder resource has a child resource at the correct path, the value
of the resource should be returned.
"""
self.responder_resource.putChild(b'foo', Data(b'bar', 'text/plain'))
response = self.client.get(
'http://localhost/.well-known/acme-challenge/foo')
assert_that(response, succeeded(MatchesAll(
MatchesStructure(
code=Equals(200),
headers=HasHeader('Content-Type', ['text/plain'])),
After(methodcaller('content'), succeeded(Equals(b'bar')))
)))
# Sanity check that a request to a different subpath does not succeed
response = self.client.get(
'http://localhost/.well-known/acme-challenge/baz')
assert_that(response, succeeded(MatchesStructure(code=Equals(404))))
def test_signal_hup(self):
"""
When a client calls the ``/mlb_signal/hup`` endpoint, the correct
response should be returned and the ``signalled_hup`` flag set True.
"""
assert_that(self.marathon_lb.check_signalled_hup(), Equals(False))
response = self.client.get('http://localhost/_mlb_signal/hup')
assert_that(response, succeeded(MatchesAll(
MatchesStructure(
code=Equals(200),
headers=HasHeader('content-type', ['text/plain'])),
After(methodcaller('text'), succeeded(
Equals('Sent SIGHUP signal to marathon-lb')))
)))
assert_that(self.marathon_lb.check_signalled_hup(), Equals(True))
# Signalled flag should be reset to false after it is checked
assert_that(self.marathon_lb.check_signalled_hup(), Equals(False))
def test_signal_usr1(self):
"""
When a client calls the ``/mlb_signal/usr1`` endpoint, the correct
response should be returned and the ``signalled_usr1`` flag set True.
"""
assert_that(self.marathon_lb.check_signalled_usr1(), Equals(False))
response = self.client.get('http://localhost/_mlb_signal/usr1')
assert_that(response, succeeded(MatchesAll(
MatchesStructure(
code=Equals(200),
headers=HasHeader('content-type', ['text/plain'])),
After(methodcaller('text'), succeeded(
Equals('Sent SIGUSR1 signal to marathon-lb')))
)))
assert_that(self.marathon_lb.check_signalled_usr1(), Equals(True))
# Signalled flag should be reset to false after it is checked
assert_that(self.marathon_lb.check_signalled_usr1(), Equals(False))
def run_bot(user_input, start):
"""
:param(str) user_input: Message sent to the bot
:param(bool) start: If the user is new to the bot
:return(str) return: Response from the bot
"""
if start:
bot.response = "Hello there! Have a chat with me or say 'help' to see available commands :)"
return bot.response
bot.user_input = user_input
# fix_typos: Fix any user typos and slang
# check_phrase_similarity: Check user's input similarity to all key phrases
# create_response: If all key phrases fail, we gotta actually make a new sentence
for method in ['fix_typos', 'help_check', 'check_phrase_similarity', 'create_response']:
if operator.methodcaller(method)(bot):
return bot.response
return random.choice(responses.UNSURE) # If no response can be created
# For testing purposes
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def __init__(self, data):
self._structure = data
self._dispatcher = {
'get': requests.get,
'post': requests.post,
'put': requests.put,
'delete': requests.delete}
self._get_system_settings()
dgst = data.get('password-digest')
if (dgst is not None and isinstance(dgst, basestring) and
hasattr(hashlib, dgst.lower())):
m = operator.methodcaller(dgst.lower(),
self.billing_password)(hashlib)
self.billing_password = m.hexdigest()
_url = urlparse(request.base_url)
self.master_url = '{0}://{1}'.format(_url.scheme, _url.netloc)
def _json_default_exact(obj):
"""Serialization handlers for types unsupported by `simplejson`
that try to preserve the exact data types.
"""
classname = type(obj).__name__
handlers = {
'datetime': methodcaller('isoformat'),
'date': methodcaller('isoformat'),
'time': methodcaller('isoformat'),
'timedelta': partial(getattrs, attrs=['days', 'seconds', 'microseconds']),
'tuple': list,
'set': list,
'frozenset': list,
'complex': partial(getattrs, attrs=['real', 'imag']),
'Decimal': str,
'Fraction': partial(getattrs, attrs=['numerator', 'denominator']),
'UUID': partial(getattrs, attrs=['hex']),
}
if classname in handlers:
return {"__class__": classname,
"__value__": handlers[classname](obj)}
elif isinstance(obj, tuple) and classname != 'tuple':
return {"__class__": "namedtuple",
"__value__": _dump_namedtuple(classname, obj)}
raise TypeError(repr(obj) + " is not JSON serializable")
def _json_default_compat(obj):
classname = type(obj).__name__
handlers = {
'datetime': methodcaller('isoformat'),
'date': methodcaller('isoformat'),
'time': methodcaller('isoformat'),
'timedelta': _timedelta_total_seconds,
'set': list,
'frozenset': list,
'complex': partial(getattrs, attrs=['real', 'imag']),
'Fraction': partial(getattrs, attrs=['numerator', 'denominator']),
'UUID': str
}
if classname in handlers:
return handlers[classname](obj)
raise TypeError(repr(obj) + " is not JSON serializable")
def remove_zero_copynumbers(gtype):
'''
remove any clonal or subclonal copy-numbers
where the total copy-number is zero
'''
if gtype=='': return ''
gtype_tmp = map(methodcaller('split',','),gtype.split('|'))
if len(gtype_tmp)==1:
gt = map(float, gtype_tmp[0])
if (gt[0]==0 and gt[1]==0) or gt[0] < 0 or gt[1] < 0:
gtype = ''
# else:
# new_gtype = []
# for gt in gtype_tmp:
# gt = map(float,gt)
# if (gt[0]!=0 or gt[1]!=0):
# new_gtype.append(gt)
# if len(new_gtype) > 0:
# new_gtype = [map(str,g) for g in new_gtype]
# gtype = '|'.join([','.join(g) for g in new_gtype])
# else:
# gtype = ''
return gtype
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def knots(self, direction=None, with_multiplicities=False):
""" Return knots vector
If `direction` is given, returns the knots in that direction, as a
list. If it is not given, returns the knots of all directions, as a
tuple.
:param int direction: Direction in which to get the knots.
:param bool with_multiplicities: If true, return knots with
multiplicities (i.e. repeated).
:raises ValueError: For invalid direction
"""
getter = attrgetter('knots') if with_multiplicities else methodcaller('knot_spans')
if direction is None:
return tuple(getter(b) for b in self.bases)
direction = check_direction(direction, self.pardim)
return getter(self.bases[direction])
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
self.assertRaises(TypeError, f)
self.assertRaises(TypeError, f, a, 3)
self.assertRaises(TypeError, f, a, spam=3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
self.assertRaises(TypeError, f)
self.assertRaises(TypeError, f, a, 3)
self.assertRaises(TypeError, f, a, spam=3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def ignore_site_packages_paths():
"""
This function ...
:return:
"""
paths = sys.path[:]
# remove working directory so that all
# local imports fail
if os.getcwd() in sys.path:
sys.path.remove(os.getcwd())
# remove all third-party paths
# so that only stdlib imports will succeed
sys.path = list(set(filter(
None,
filter(lambda i: all(('site-packages' not in i,
'python' in i or 'pypy' in i)),
map(methodcaller('lower'), sys.path))
)))
yield
sys.path = paths
# -----------------------------------------------------------------
def ignore_site_packages_paths():
"""
This function ...
:return:
"""
paths = sys.path[:]
# remove working directory so that all
# local imports fail
if os.getcwd() in sys.path:
sys.path.remove(os.getcwd())
# remove all third-party paths
# so that only stdlib imports will succeed
sys.path = list(set(filter(
None,
filter(lambda i: all(('site-packages' not in i,
'python' in i or 'pypy' in i)),
map(methodcaller('lower'), sys.path))
)))
yield
sys.path = paths
# -----------------------------------------------------------------
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def test_set_axis_name_mi(self):
df = DataFrame(
np.empty((3, 3)),
index=MultiIndex.from_tuples([("A", x) for x in list('aBc')]),
columns=MultiIndex.from_tuples([('C', x) for x in list('xyz')])
)
level_names = ['L1', 'L2']
funcs = ['_set_axis_name', 'rename_axis']
for func in funcs:
result = methodcaller(func, level_names)(df)
self.assertEqual(result.index.names, level_names)
self.assertEqual(result.columns.names, [None, None])
result = methodcaller(func, level_names, axis=1)(df)
self.assertEqual(result.columns.names, ["L1", "L2"])
self.assertEqual(result.index.names, [None, None])
def after_post(self, response, code):
if "Your search produced no hits." not in response.body:
with codecs.open('postcodes.csv', "ab", "utf-8") as csv_file:
with codecs.open('errors.csv', "ab", "utf-8") as errors_file:
for result in response.css('table.result').css('tr'):
try:
if len(result.css('td::text').extract()) == 3:
street_name, post_code, city = \
result.css('td::text').extract()
po_box_no = ''
else:
street_name, po_box_no, post_code, city = \
result.css('td::text').extract()
titler = methodcaller('title')
txt = u','.join(
map(titler,
(street_name, po_box_no, post_code, city))
)
csv_file.write(u'%s,%s\n' % (code, txt))
except:
errors_file.write('%s\n' % code)
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def test_check_closed(self):
f = dumbdbm.open(_fname, 'c')
f.close()
for meth in (partial(operator.delitem, f),
partial(operator.setitem, f, 'b'),
partial(operator.getitem, f),
partial(operator.contains, f)):
with self.assertRaises(dumbdbm.error) as cm:
meth('test')
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
for meth in (operator.methodcaller('keys'),
operator.methodcaller('iterkeys'),
operator.methodcaller('items'),
len):
with self.assertRaises(dumbdbm.error) as cm:
meth(f)
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def get_vlan_graph_url(vlanid, family=4, timeframe="day"):
"""Returns a Graphite graph render URL for a VLAN"""
vlan = get_object_or_404(Vlan, pk=vlanid)
try:
family = int(family)
except ValueError:
family = 4
extra = {'where': ['family(netaddr) = %s' % family]}
prefixes = sorted(vlan.prefix_set.all().extra(**extra),
key=methodcaller('get_prefix_size'),
reverse=True)
if not prefixes:
return None
metrics = _vlan_metrics_from_prefixes(prefixes, family)
return get_simple_graph_url(
metrics, "1" + timeframe,
title="Total IPv{0} addresses on VLAN {1}".format(family, vlan),
width=597, height=251)
def test_check_closed(self):
f = dumbdbm.open(_fname, 'c')
f.close()
for meth in (partial(operator.delitem, f),
partial(operator.setitem, f, 'b'),
partial(operator.getitem, f),
partial(operator.contains, f)):
with self.assertRaises(dumbdbm.error) as cm:
meth('test')
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
for meth in (operator.methodcaller('keys'),
operator.methodcaller('iterkeys'),
operator.methodcaller('items'),
len):
with self.assertRaises(dumbdbm.error) as cm:
meth(f)
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def consensus(self):
"""
Select consensus MEI. Selection criteria (ordered by preference order):
1) Lowest CIPOS
2) Highest total number of supporting reads (+ plus - cluster supporting reads)
chr1 --------*---------------*-------------*------------
beg cluster1 cluster2 cluster3 end
(MEI1,MEI2,MEI3) (MEI4) (MEI5,MEI6)
consensus **** **** ****
"""
## 1. Sort by total number of supporting paired-ends (decreasing order)
sortedList1 = sorted(self.MEIlist, key=methodcaller('nbPE'), reverse=True)
## 2. Sort by CIPOS (ascending order)
MEItuples = [(MEIobj, int(MEIobj.infoDict["CIPOS"])) for MEIobj in sortedList1]
sortedMEItuples = sorted(MEItuples, key=itemgetter(1))
sortedList2 = [i[0] for i in sortedMEItuples]
## 3. Select consensus MEI (the one with lowest CIPOS and highest total number of supporting paired-ends)
consensusMEIobj = sortedList2[0]
return consensusMEIobj
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def __init__(self, **kwargs):
do_all = kwargs.pop('all', False) is True
self.fixes = []
for fix, cls in all_fixes.items():
if do_all:
demand = fix not in kwargs
kwargs.pop(fix, None)
else:
demand = bool(kwargs.pop(fix, False))
if demand:
options = {}
for opt in cls.options.keys():
if opt in kwargs:
options[opt] = kwargs.pop(opt)
self.fixes.append(cls(**options))
self.fixes.sort(key=lambda fix: fix.order)
super(WikitextFixingBot, self).__init__(**kwargs)
for fix in self.fixes:
fix.site = self.site
if not self.generator:
pywikibot.output('No generator provided, making own generator...')
self.generator = pagegenerators.PreloadingGenerator(
chain.from_iterable(map(methodcaller('generator'), self.fixes)))
def formatter(self, textvalue):
prop = self.current_page
if prop.type not in ['commonsMedia', 'external-id', 'string']:
pywikibot.output('"%s" datatype doesn\'t make use of formatter'
'' % prop.type)
return True
for match in self.get_formatter_regex().findall(textvalue):
if any(map(methodcaller('target_equals', match),
prop.claims.get('P1630', []))):
pywikibot.output('"%s" already has "%s" as the formatter URL'
'' % (prop.title(), match))
continue
if match.strip() in ['http://', 'https://']:
continue # ???
claim = pywikibot.Claim(self.repo, 'P1630')
claim.setTarget(match)
self.user_edit_entity(prop, {'claims':[claim.toJSON()]},
summary=self.make_summary('P1630', match),
asynchronous=True)
return True
def from_mapping(self, *mapping, **kwargs) -> bool:
""" Updates the config like :meth:`update` but ignoring items with
non-upper keys.
"""
if len(mapping) > 1:
raise TypeError(
'expected at most 1 positional argument, got %d' % len(mapping)
)
dict_: dict = {}
if len(mapping) == 1:
dict_ = mapping[0]
if not isinstance(dict_, dict):
raise TypeError(
'expected dict type argument, got %s' % dict_.__class__
)
for key, value in chain(*map(methodcaller('items'),
(dict_, kwargs))):
if key.isupper():
self.data[key] = value
return True
def _MessageToJsonObject(self, message):
"""Converts message to an object according to Proto3 JSON Specification."""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
return self._WrapperMessageToJsonObject(message)
if full_name in _WKTJSONMETHODS:
return methodcaller(_WKTJSONMETHODS[full_name][0], message)(self)
js = {}
return self._RegularMessageToJsonObject(message, js)
def WithErrorTypeAndMessage(error_type, message):
"""
Check that a Twisted failure was caused by a certain error type with a
certain message.
"""
return MatchesAll(
MatchesStructure(value=IsInstance(error_type)),
After(methodcaller('getErrorMessage'), Equals(message))
)
def request(content, endpoint, api, language=None, uri=False, **kwargs):
"""Request Rosette API results for the given content and endpoint.
This method gets the requested results from the Rosette API as JSON. If
api's output parameter has been set to "rosette" then the JSON will consist
of an A(nnotated) D(ata) M(odel) or ADM. An ADM is a Python dict
representing document content, annotations of the document content,
and document metadata.
content: path or URI of a document for the Rosette API to process
endpoint: a Rosette API endpoint string (e.g., 'entities')
(see https://developer.rosette.com/features-and-functions)
api: a rosette.api.API instance
(e.g., API(user_key=<key>, service_url=<url>))
language: an optional ISO 639-2 T language code
(the Rosette API will automatically detect the language of the
content by default)
uri: specify that the content is to be treated as a URI and the
the document content is to be extracted from the URI
kwargs: additional keyword arguments
(e.g., if endpoint is 'morphology' you can specify facet='lemmas';
see https://developer.rosette.com/features-and-functions for
complete documentation)
"""
parameters = DocumentParameters()
if uri:
parameters['contentUri'] = content
else:
parameters['content'] = content
parameters['language'] = language
adm = methodcaller(endpoint, parameters, **kwargs)(api)
return adm