我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用django.utils.six.next()。
def previous_current_next(items): """ From http://www.wordaligned.org/articles/zippy-triples-served-with-python Creates an iterator which returns (previous, current, next) triples, with ``None`` filling in when there is no previous or next available. """ extend = itertools.chain([None], items, [None]) prev, cur, nex = itertools.tee(extend, 3) # Advancing an iterator twice when we know there are two items (the # two Nones at the start and at the end) will never fail except if # `items` is some funny StopIteration-raising generator. There's no point # in swallowing this exception. next(cur) next(nex) next(nex) return zip(prev, cur, nex)
def ac_on(request): STATE_BTN_MAP = { 'verylow': 'BTN_3', 'low': 'BTN_5', 'medium': 'BTN_8', 'high': 'BTN_10', } config = six.next(ACConfig.query(hash_key=settings.AC_LOCATION)) btn_name = STATE_BTN_MAP[config.state] logger.info('ac_on state {} btn_name {}'.format(config.state, btn_name)) return ac_command(btn_name)
def test_password_reset_form(self): self.assertEqual( six.next(admin_forms.PasswordResetForm().get_users(self.user_1.email)), self.user_1 ) with self.assertRaises(StopIteration): six.next(admin_forms.PasswordResetForm().get_users(self.user_2.email)) with self.assertRaises(StopIteration): six.next(admin_forms.PasswordResetForm().get_users(self.user_3.email))
def __next__(self): """Generate next parsed row""" row = six.next(self.reader) # although the DictReader doesn't return blank lines, we want # to count them so we can pinpoint errors exactly within the # source file. self.line_num = self.reader.reader.line_num self.validate_row(row) return row
def __next__(self): """Returns next line""" line = six.next(self.source_iterator) return self.COMMENT_PATTERN.sub('', line)
def __next__(self): """Parses and returns next line""" try: row = six.next(self.parser) row = self._decode_as_utf8(row) objects = self._create_objects_from_row(row) except BulkParseError as error: objects = error return self.parser.line_num, objects
def test_simple_import_yields_netbox_and_device_model(self): data = 'myroom:10.0.90.252:myorg:SW:1:public::' parser = bulkparse.NetboxBulkParser(data) importer = bulkimport.NetboxImporter(parser) _line_num, objects = six.next(importer) self.assertTrue(isinstance(objects, list), repr(objects)) self.assertTrue(len(objects) == 1, repr(objects)) self.assertTrue(isinstance(objects[0], manage.Netbox), objects[0])
def test_simple_import_yields_objects_with_proper_values(self): data = 'myroom:10.0.90.252:myorg:SW:1:public::' parser = bulkparse.NetboxBulkParser(data) importer = bulkimport.NetboxImporter(parser) _line_num, objects = six.next(importer) (netbox, ) = objects self.assertEquals(netbox.ip, '10.0.90.252') self.assertEquals(netbox.room_id, 'myroom') self.assertEquals(netbox.organization_id, 'myorg') self.assertEquals(netbox.category_id, 'SW') self.assertEquals(netbox.snmp_version, '1') self.assertEquals(netbox.read_only, 'public')
def test_netbox_function_is_set(self): data = 'myroom:10.0.90.252:myorg:SW:1:public:::does things:' parser = bulkparse.NetboxBulkParser(data) importer = bulkimport.NetboxImporter(parser) _line_num, objects = six.next(importer) types = dict((type(c), c) for c in objects) self.assertTrue(manage.NetboxInfo in types, types)
def test_netbox_groups_are_set(self): data = 'myroom:10.0.90.10:myorg:SRV:::::fileserver::WEB:UNIX:MAIL' parser = bulkparse.NetboxBulkParser(data) importer = bulkimport.NetboxImporter(parser) _line_num, objects = six.next(importer) netboxgroups = [o for o in objects if isinstance(o, manage.NetboxCategory)] self.assertTrue(len(netboxgroups) > 0, objects)
def parse_to_objects(data): parser = bulkparse.NetboxBulkParser(data) importer = bulkimport.NetboxImporter(parser) _line_num, objects = six.next(importer) return objects
def parse_to_objects(data): parser = bulkparse.LocationBulkParser(data) importer = bulkimport.LocationImporter(parser) _line_num, objects = six.next(importer) return objects
def test_import(self): data = "10.0.1.0/24:lan:uninett:here-there:employee:Employee LAN:20" parser = bulkparse.PrefixBulkParser(data) importer = bulkimport.PrefixImporter(parser) _line_num, objects = six.next(importer) if isinstance(objects, Exception): raise objects self.assertEquals(len(objects), 2) self.assertTrue(isinstance(objects[0], manage.Vlan)) self.assertTrue(isinstance(objects[1], manage.Prefix))
def test_parse_single_line_yields_columns(self): data = (b"room1:10.0.0.186:myorg:SW:1:public:secret:amaster:doesthings:" b"key=value:blah1:blah2") b = bulkparse.NetboxBulkParser(data) out_data = six.next(b) self.assertTrue(isinstance(out_data, dict), out_data) self.assertEquals(out_data['roomid'], 'room1') self.assertEquals(out_data['ip'], '10.0.0.186') self.assertEquals(out_data['orgid'], 'myorg') self.assertEquals(out_data['catid'], 'SW') self.assertEquals(out_data['master'], 'amaster') self.assertEquals(out_data['data'], 'key=value') self.assertEquals(out_data['netboxgroup'], ['blah1', 'blah2'])
def test_invalid_ip_should_raise_error(self): data = b"room1:10.0.x.x:myorg:SW:public:parrot::\n" b = bulkparse.NetboxBulkParser(data) self.assertRaises(bulkparse.InvalidFieldValue, lambda: six.next(b))
def test_short_line_should_raise_error_with_correct_details(self): data = b"room1:10.0.0.8" b = bulkparse.NetboxBulkParser(data) try: six.next(b) except bulkparse.RequiredFieldMissing as error: self.assertEquals(error.line_num, 1) self.assertEquals(error.missing_field, 'orgid') else: self.fail("No exception raised")
def test_leading_comments_should_be_stripped(self): data = b"#comment\nsby:student village" b = bulkparse.UsageBulkParser(data) first_row = six.next(b) self.assertEquals(first_row['usageid'], 'sby')
def test_invalid_prefix_should_raise_error(self): data = b"10.0.0.x/3f:scope" b = bulkparse.PrefixBulkParser(data) self.assertRaises(bulkparse.InvalidFieldValue, lambda: six.next(b))
def test_invalid_service_arguments_should_raise_error(self): data = b"host.example.org;http;port80" b = bulkparse.ServiceBulkParser(data) self.assertRaises(bulkparse.InvalidFieldValue, lambda: six.next(b))
def test_valid_service_arguments_should_not_raise_error(self): data = b"host.example.org;http;port=80;uri=/" b = bulkparse.ServiceBulkParser(data) self.assertTrue(six.next(b))
def test_leading_comment_should_be_stripped(self): data = iter(['#leadingcomment\n', 'something\n']) stripper = bulkparse.CommentStripper(data) self.assertEquals(six.next(stripper), '\n') self.assertEquals(six.next(stripper), 'something\n')
def test_suffix_comment_should_be_stripped(self): data = iter(['somedata\n', 'otherdata # ignore this\n']) stripper = bulkparse.CommentStripper(data) self.assertEquals(six.next(stripper), 'somedata\n') self.assertEquals(six.next(stripper), 'otherdata\n')
def parse_items(itemstring, sort=False): """Like taggit.utils.parse_tags, but without sorting""" if not itemstring: return [] itemstring = force_text(itemstring) words = [] buf = [] # Defer splitting of non-quoted sections until we know if there are # any unquoted commas. to_be_split = [] i = iter(itemstring) try: while True: c = six.next(i) if c == '"': if buf: to_be_split.append(''.join(buf)) buf = [] # Find the matching quote c = six.next(i) while c != '"': buf.append(c) c = six.next(i) if buf: word = ''.join(buf).strip() if word: words.append(word) buf = [] else: buf.append(c) except StopIteration: # If we were parsing an open quote which was never closed treat # the buffer as unquoted. if buf: to_be_split.append(''.join(buf)) if to_be_split: delimiter = ',' for chunk in to_be_split: words.extend(split_strip(chunk, delimiter)) if sort: words = list(set(words)) words.sort() return words