我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用operator.methodcaller()。
def _AnyMessageToJsonObject(self, message): """Converts Any message according to Proto3 JSON Specification.""" if not message.ListFields(): return {} # Must print @type first, use OrderedDict instead of {} js = OrderedDict() type_url = message.type_url js['@type'] = type_url sub_message = _CreateMessageFromTypeUrl(type_url) sub_message.ParseFromString(message.value) message_descriptor = sub_message.DESCRIPTOR full_name = message_descriptor.full_name if _IsWrapperMessage(message_descriptor): js['value'] = self._WrapperMessageToJsonObject(sub_message) return js if full_name in _WKTJSONMETHODS: js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0], sub_message)(self) return js return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message): """Convert a JSON object into a message. Args: value: A JSON object. message: A WKT or regular protocol message to record the data. Raises: ParseError: In case of convert problems. """ message_descriptor = message.DESCRIPTOR full_name = message_descriptor.full_name if _IsWrapperMessage(message_descriptor): self._ConvertWrapperMessage(value, message) elif full_name in _WKTJSONMETHODS: methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self) else: self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message): """Convert a JSON representation into Any message.""" if isinstance(value, dict) and not value: return try: type_url = value['@type'] except KeyError: raise ParseError('@type is missing when parsing any message.') sub_message = _CreateMessageFromTypeUrl(type_url) message_descriptor = sub_message.DESCRIPTOR full_name = message_descriptor.full_name if _IsWrapperMessage(message_descriptor): self._ConvertWrapperMessage(value['value'], sub_message) elif full_name in _WKTJSONMETHODS: methodcaller( _WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self) else: del value['@type'] self._ConvertFieldValuePair(value, sub_message) # Sets Any message message.value = sub_message.SerializeToString() message.type_url = type_url
def test_responder_resource_child(self): """ When a GET request is made to the ACME challenge path, and the responder resource has a child resource at the correct path, the value of the resource should be returned. """ self.responder_resource.putChild(b'foo', Data(b'bar', 'text/plain')) response = self.client.get( 'http://localhost/.well-known/acme-challenge/foo') assert_that(response, succeeded(MatchesAll( MatchesStructure( code=Equals(200), headers=HasHeader('Content-Type', ['text/plain'])), After(methodcaller('content'), succeeded(Equals(b'bar'))) ))) # Sanity check that a request to a different subpath does not succeed response = self.client.get( 'http://localhost/.well-known/acme-challenge/baz') assert_that(response, succeeded(MatchesStructure(code=Equals(404))))
def test_signal_hup(self): """ When a client calls the ``/mlb_signal/hup`` endpoint, the correct response should be returned and the ``signalled_hup`` flag set True. """ assert_that(self.marathon_lb.check_signalled_hup(), Equals(False)) response = self.client.get('http://localhost/_mlb_signal/hup') assert_that(response, succeeded(MatchesAll( MatchesStructure( code=Equals(200), headers=HasHeader('content-type', ['text/plain'])), After(methodcaller('text'), succeeded( Equals('Sent SIGHUP signal to marathon-lb'))) ))) assert_that(self.marathon_lb.check_signalled_hup(), Equals(True)) # Signalled flag should be reset to false after it is checked assert_that(self.marathon_lb.check_signalled_hup(), Equals(False))
def test_signal_usr1(self): """ When a client calls the ``/mlb_signal/usr1`` endpoint, the correct response should be returned and the ``signalled_usr1`` flag set True. """ assert_that(self.marathon_lb.check_signalled_usr1(), Equals(False)) response = self.client.get('http://localhost/_mlb_signal/usr1') assert_that(response, succeeded(MatchesAll( MatchesStructure( code=Equals(200), headers=HasHeader('content-type', ['text/plain'])), After(methodcaller('text'), succeeded( Equals('Sent SIGUSR1 signal to marathon-lb'))) ))) assert_that(self.marathon_lb.check_signalled_usr1(), Equals(True)) # Signalled flag should be reset to false after it is checked assert_that(self.marathon_lb.check_signalled_usr1(), Equals(False))
def run_bot(user_input, start): """ :param(str) user_input: Message sent to the bot :param(bool) start: If the user is new to the bot :return(str) return: Response from the bot """ if start: bot.response = "Hello there! Have a chat with me or say 'help' to see available commands :)" return bot.response bot.user_input = user_input # fix_typos: Fix any user typos and slang # check_phrase_similarity: Check user's input similarity to all key phrases # create_response: If all key phrases fail, we gotta actually make a new sentence for method in ['fix_typos', 'help_check', 'check_phrase_similarity', 'create_response']: if operator.methodcaller(method)(bot): return bot.response return random.choice(responses.UNSURE) # If no response can be created # For testing purposes
def __init__(self, data): self._structure = data self._dispatcher = { 'get': requests.get, 'post': requests.post, 'put': requests.put, 'delete': requests.delete} self._get_system_settings() dgst = data.get('password-digest') if (dgst is not None and isinstance(dgst, basestring) and hasattr(hashlib, dgst.lower())): m = operator.methodcaller(dgst.lower(), self.billing_password)(hashlib) self.billing_password = m.hexdigest() _url = urlparse(request.base_url) self.master_url = '{0}://{1}'.format(_url.scheme, _url.netloc)
def _json_default_exact(obj): """Serialization handlers for types unsupported by `simplejson` that try to preserve the exact data types. """ classname = type(obj).__name__ handlers = { 'datetime': methodcaller('isoformat'), 'date': methodcaller('isoformat'), 'time': methodcaller('isoformat'), 'timedelta': partial(getattrs, attrs=['days', 'seconds', 'microseconds']), 'tuple': list, 'set': list, 'frozenset': list, 'complex': partial(getattrs, attrs=['real', 'imag']), 'Decimal': str, 'Fraction': partial(getattrs, attrs=['numerator', 'denominator']), 'UUID': partial(getattrs, attrs=['hex']), } if classname in handlers: return {"__class__": classname, "__value__": handlers[classname](obj)} elif isinstance(obj, tuple) and classname != 'tuple': return {"__class__": "namedtuple", "__value__": _dump_namedtuple(classname, obj)} raise TypeError(repr(obj) + " is not JSON serializable")
def _json_default_compat(obj): classname = type(obj).__name__ handlers = { 'datetime': methodcaller('isoformat'), 'date': methodcaller('isoformat'), 'time': methodcaller('isoformat'), 'timedelta': _timedelta_total_seconds, 'set': list, 'frozenset': list, 'complex': partial(getattrs, attrs=['real', 'imag']), 'Fraction': partial(getattrs, attrs=['numerator', 'denominator']), 'UUID': str } if classname in handlers: return handlers[classname](obj) raise TypeError(repr(obj) + " is not JSON serializable")
def remove_zero_copynumbers(gtype): ''' remove any clonal or subclonal copy-numbers where the total copy-number is zero ''' if gtype=='': return '' gtype_tmp = map(methodcaller('split',','),gtype.split('|')) if len(gtype_tmp)==1: gt = map(float, gtype_tmp[0]) if (gt[0]==0 and gt[1]==0) or gt[0] < 0 or gt[1] < 0: gtype = '' # else: # new_gtype = [] # for gt in gtype_tmp: # gt = map(float,gt) # if (gt[0]!=0 or gt[1]!=0): # new_gtype.append(gt) # if len(new_gtype) > 0: # new_gtype = [map(str,g) for g in new_gtype] # gtype = '|'.join([','.join(g) for g in new_gtype]) # else: # gtype = '' return gtype
def test_methodcaller(self): self.assertRaises(TypeError, operator.methodcaller) class A: def foo(self, *args, **kwds): return args[0] + args[1] def bar(self, f=42): return f a = A() f = operator.methodcaller('foo') self.assertRaises(IndexError, f, a) f = operator.methodcaller('foo', 1, 2) self.assertEqual(f(a), 3) f = operator.methodcaller('bar') self.assertEqual(f(a), 42) self.assertRaises(TypeError, f, a, a) f = operator.methodcaller('bar', f=5) self.assertEqual(f(a), 5)
def knots(self, direction=None, with_multiplicities=False): """ Return knots vector If `direction` is given, returns the knots in that direction, as a list. If it is not given, returns the knots of all directions, as a tuple. :param int direction: Direction in which to get the knots. :param bool with_multiplicities: If true, return knots with multiplicities (i.e. repeated). :raises ValueError: For invalid direction """ getter = attrgetter('knots') if with_multiplicities else methodcaller('knot_spans') if direction is None: return tuple(getter(b) for b in self.bases) direction = check_direction(direction, self.pardim) return getter(self.bases[direction])
def test_methodcaller(self): self.assertRaises(TypeError, operator.methodcaller) class A: def foo(self, *args, **kwds): return args[0] + args[1] def bar(self, f=42): return f a = A() f = operator.methodcaller('foo') self.assertRaises(IndexError, f, a) f = operator.methodcaller('foo', 1, 2) self.assertEqual(f(a), 3) self.assertRaises(TypeError, f) self.assertRaises(TypeError, f, a, 3) self.assertRaises(TypeError, f, a, spam=3) f = operator.methodcaller('bar') self.assertEqual(f(a), 42) self.assertRaises(TypeError, f, a, a) f = operator.methodcaller('bar', f=5) self.assertEqual(f(a), 5)
def ignore_site_packages_paths(): """ This function ... :return: """ paths = sys.path[:] # remove working directory so that all # local imports fail if os.getcwd() in sys.path: sys.path.remove(os.getcwd()) # remove all third-party paths # so that only stdlib imports will succeed sys.path = list(set(filter( None, filter(lambda i: all(('site-packages' not in i, 'python' in i or 'pypy' in i)), map(methodcaller('lower'), sys.path)) ))) yield sys.path = paths # -----------------------------------------------------------------
def test_set_axis_name_mi(self): df = DataFrame( np.empty((3, 3)), index=MultiIndex.from_tuples([("A", x) for x in list('aBc')]), columns=MultiIndex.from_tuples([('C', x) for x in list('xyz')]) ) level_names = ['L1', 'L2'] funcs = ['_set_axis_name', 'rename_axis'] for func in funcs: result = methodcaller(func, level_names)(df) self.assertEqual(result.index.names, level_names) self.assertEqual(result.columns.names, [None, None]) result = methodcaller(func, level_names, axis=1)(df) self.assertEqual(result.columns.names, ["L1", "L2"]) self.assertEqual(result.index.names, [None, None])
def after_post(self, response, code): if "Your search produced no hits." not in response.body: with codecs.open('postcodes.csv', "ab", "utf-8") as csv_file: with codecs.open('errors.csv', "ab", "utf-8") as errors_file: for result in response.css('table.result').css('tr'): try: if len(result.css('td::text').extract()) == 3: street_name, post_code, city = \ result.css('td::text').extract() po_box_no = '' else: street_name, po_box_no, post_code, city = \ result.css('td::text').extract() titler = methodcaller('title') txt = u','.join( map(titler, (street_name, po_box_no, post_code, city)) ) csv_file.write(u'%s,%s\n' % (code, txt)) except: errors_file.write('%s\n' % code)
def test_check_closed(self): f = dumbdbm.open(_fname, 'c') f.close() for meth in (partial(operator.delitem, f), partial(operator.setitem, f, 'b'), partial(operator.getitem, f), partial(operator.contains, f)): with self.assertRaises(dumbdbm.error) as cm: meth('test') self.assertEqual(str(cm.exception), "DBM object has already been closed") for meth in (operator.methodcaller('keys'), operator.methodcaller('iterkeys'), operator.methodcaller('items'), len): with self.assertRaises(dumbdbm.error) as cm: meth(f) self.assertEqual(str(cm.exception), "DBM object has already been closed")
def get_vlan_graph_url(vlanid, family=4, timeframe="day"): """Returns a Graphite graph render URL for a VLAN""" vlan = get_object_or_404(Vlan, pk=vlanid) try: family = int(family) except ValueError: family = 4 extra = {'where': ['family(netaddr) = %s' % family]} prefixes = sorted(vlan.prefix_set.all().extra(**extra), key=methodcaller('get_prefix_size'), reverse=True) if not prefixes: return None metrics = _vlan_metrics_from_prefixes(prefixes, family) return get_simple_graph_url( metrics, "1" + timeframe, title="Total IPv{0} addresses on VLAN {1}".format(family, vlan), width=597, height=251)
def consensus(self): """ Select consensus MEI. Selection criteria (ordered by preference order): 1) Lowest CIPOS 2) Highest total number of supporting reads (+ plus - cluster supporting reads) chr1 --------*---------------*-------------*------------ beg cluster1 cluster2 cluster3 end (MEI1,MEI2,MEI3) (MEI4) (MEI5,MEI6) consensus **** **** **** """ ## 1. Sort by total number of supporting paired-ends (decreasing order) sortedList1 = sorted(self.MEIlist, key=methodcaller('nbPE'), reverse=True) ## 2. Sort by CIPOS (ascending order) MEItuples = [(MEIobj, int(MEIobj.infoDict["CIPOS"])) for MEIobj in sortedList1] sortedMEItuples = sorted(MEItuples, key=itemgetter(1)) sortedList2 = [i[0] for i in sortedMEItuples] ## 3. Select consensus MEI (the one with lowest CIPOS and highest total number of supporting paired-ends) consensusMEIobj = sortedList2[0] return consensusMEIobj
def __init__(self, **kwargs): do_all = kwargs.pop('all', False) is True self.fixes = [] for fix, cls in all_fixes.items(): if do_all: demand = fix not in kwargs kwargs.pop(fix, None) else: demand = bool(kwargs.pop(fix, False)) if demand: options = {} for opt in cls.options.keys(): if opt in kwargs: options[opt] = kwargs.pop(opt) self.fixes.append(cls(**options)) self.fixes.sort(key=lambda fix: fix.order) super(WikitextFixingBot, self).__init__(**kwargs) for fix in self.fixes: fix.site = self.site if not self.generator: pywikibot.output('No generator provided, making own generator...') self.generator = pagegenerators.PreloadingGenerator( chain.from_iterable(map(methodcaller('generator'), self.fixes)))
def formatter(self, textvalue): prop = self.current_page if prop.type not in ['commonsMedia', 'external-id', 'string']: pywikibot.output('"%s" datatype doesn\'t make use of formatter' '' % prop.type) return True for match in self.get_formatter_regex().findall(textvalue): if any(map(methodcaller('target_equals', match), prop.claims.get('P1630', []))): pywikibot.output('"%s" already has "%s" as the formatter URL' '' % (prop.title(), match)) continue if match.strip() in ['http://', 'https://']: continue # ??? claim = pywikibot.Claim(self.repo, 'P1630') claim.setTarget(match) self.user_edit_entity(prop, {'claims':[claim.toJSON()]}, summary=self.make_summary('P1630', match), asynchronous=True) return True
def from_mapping(self, *mapping, **kwargs) -> bool: """ Updates the config like :meth:`update` but ignoring items with non-upper keys. """ if len(mapping) > 1: raise TypeError( 'expected at most 1 positional argument, got %d' % len(mapping) ) dict_: dict = {} if len(mapping) == 1: dict_ = mapping[0] if not isinstance(dict_, dict): raise TypeError( 'expected dict type argument, got %s' % dict_.__class__ ) for key, value in chain(*map(methodcaller('items'), (dict_, kwargs))): if key.isupper(): self.data[key] = value return True
def _MessageToJsonObject(self, message): """Converts message to an object according to Proto3 JSON Specification.""" message_descriptor = message.DESCRIPTOR full_name = message_descriptor.full_name if _IsWrapperMessage(message_descriptor): return self._WrapperMessageToJsonObject(message) if full_name in _WKTJSONMETHODS: return methodcaller(_WKTJSONMETHODS[full_name][0], message)(self) js = {} return self._RegularMessageToJsonObject(message, js)
def WithErrorTypeAndMessage(error_type, message): """ Check that a Twisted failure was caused by a certain error type with a certain message. """ return MatchesAll( MatchesStructure(value=IsInstance(error_type)), After(methodcaller('getErrorMessage'), Equals(message)) )
def request(content, endpoint, api, language=None, uri=False, **kwargs): """Request Rosette API results for the given content and endpoint. This method gets the requested results from the Rosette API as JSON. If api's output parameter has been set to "rosette" then the JSON will consist of an A(nnotated) D(ata) M(odel) or ADM. An ADM is a Python dict representing document content, annotations of the document content, and document metadata. content: path or URI of a document for the Rosette API to process endpoint: a Rosette API endpoint string (e.g., 'entities') (see https://developer.rosette.com/features-and-functions) api: a rosette.api.API instance (e.g., API(user_key=<key>, service_url=<url>)) language: an optional ISO 639-2 T language code (the Rosette API will automatically detect the language of the content by default) uri: specify that the content is to be treated as a URI and the the document content is to be extracted from the URI kwargs: additional keyword arguments (e.g., if endpoint is 'morphology' you can specify facet='lemmas'; see https://developer.rosette.com/features-and-functions for complete documentation) """ parameters = DocumentParameters() if uri: parameters['contentUri'] = content else: parameters['content'] = content parameters['language'] = language adm = methodcaller(endpoint, parameters, **kwargs)(api) return adm