Python django.utils.six 模块,next() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用django.utils.six.next()

项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def previous_current_next(items):
    """
    From http://www.wordaligned.org/articles/zippy-triples-served-with-python

    Creates an iterator which returns (previous, current, next) triples,
    with ``None`` filling in when there is no previous or next
    available.
    """
    extend = itertools.chain([None], items, [None])
    prev, cur, nex = itertools.tee(extend, 3)
    # Advancing an iterator twice when we know there are two items (the
    # two Nones at the start and at the end) will never fail except if
    # `items` is some funny StopIteration-raising generator. There's no point
    # in swallowing this exception.
    next(cur)
    next(nex)
    next(nex)
    return zip(prev, cur, nex)
项目:hardware-lab    作者:Buzzvil    | 项目源码 | 文件源码
def ac_on(request):
    STATE_BTN_MAP = {
        'verylow': 'BTN_3',
        'low': 'BTN_5',
        'medium': 'BTN_8',
        'high': 'BTN_10',
    }

    config = six.next(ACConfig.query(hash_key=settings.AC_LOCATION))
    btn_name = STATE_BTN_MAP[config.state]
    logger.info('ac_on state {} btn_name {}'.format(config.state, btn_name))
    return ac_command(btn_name)
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def test_password_reset_form(self):
        self.assertEqual(
            six.next(admin_forms.PasswordResetForm().get_users(self.user_1.email)),
            self.user_1
        )
        with self.assertRaises(StopIteration):
            six.next(admin_forms.PasswordResetForm().get_users(self.user_2.email))

        with self.assertRaises(StopIteration):
            six.next(admin_forms.PasswordResetForm().get_users(self.user_3.email))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __next__(self):
        """Generate next parsed row"""
        row = six.next(self.reader)
        # although the DictReader doesn't return blank lines, we want
        # to count them so we can pinpoint errors exactly within the
        # source file.
        self.line_num = self.reader.reader.line_num

        self.validate_row(row)
        return row
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __next__(self):
        """Returns next line"""
        line = six.next(self.source_iterator)
        return self.COMMENT_PATTERN.sub('', line)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __next__(self):
        """Parses and returns next line"""
        try:
            row = six.next(self.parser)
            row = self._decode_as_utf8(row)
            objects = self._create_objects_from_row(row)
        except BulkParseError as error:
            objects = error
        return self.parser.line_num, objects
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_simple_import_yields_netbox_and_device_model(self):
        data = 'myroom:10.0.90.252:myorg:SW:1:public::'
        parser = bulkparse.NetboxBulkParser(data)
        importer = bulkimport.NetboxImporter(parser)
        _line_num, objects = six.next(importer)

        self.assertTrue(isinstance(objects, list), repr(objects))
        self.assertTrue(len(objects) == 1, repr(objects))
        self.assertTrue(isinstance(objects[0], manage.Netbox), objects[0])
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_simple_import_yields_objects_with_proper_values(self):
        data = 'myroom:10.0.90.252:myorg:SW:1:public::'
        parser = bulkparse.NetboxBulkParser(data)
        importer = bulkimport.NetboxImporter(parser)
        _line_num, objects = six.next(importer)

        (netbox, ) = objects
        self.assertEquals(netbox.ip, '10.0.90.252')
        self.assertEquals(netbox.room_id, 'myroom')
        self.assertEquals(netbox.organization_id, 'myorg')
        self.assertEquals(netbox.category_id, 'SW')
        self.assertEquals(netbox.snmp_version, '1')
        self.assertEquals(netbox.read_only, 'public')
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_netbox_function_is_set(self):
        data = 'myroom:10.0.90.252:myorg:SW:1:public:::does things:'
        parser = bulkparse.NetboxBulkParser(data)
        importer = bulkimport.NetboxImporter(parser)
        _line_num, objects = six.next(importer)

        types = dict((type(c), c) for c in objects)
        self.assertTrue(manage.NetboxInfo in types, types)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_netbox_groups_are_set(self):
        data = 'myroom:10.0.90.10:myorg:SRV:::::fileserver::WEB:UNIX:MAIL'
        parser = bulkparse.NetboxBulkParser(data)
        importer = bulkimport.NetboxImporter(parser)
        _line_num, objects = six.next(importer)

        netboxgroups = [o for o in objects
                        if isinstance(o, manage.NetboxCategory)]
        self.assertTrue(len(netboxgroups) > 0, objects)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def parse_to_objects(data):
        parser = bulkparse.NetboxBulkParser(data)
        importer = bulkimport.NetboxImporter(parser)
        _line_num, objects = six.next(importer)
        return objects
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def parse_to_objects(data):
        parser = bulkparse.LocationBulkParser(data)
        importer = bulkimport.LocationImporter(parser)
        _line_num, objects = six.next(importer)
        return objects
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_import(self):
        data = "10.0.1.0/24:lan:uninett:here-there:employee:Employee LAN:20"
        parser = bulkparse.PrefixBulkParser(data)
        importer = bulkimport.PrefixImporter(parser)
        _line_num, objects = six.next(importer)

        if isinstance(objects, Exception):
            raise objects
        self.assertEquals(len(objects), 2)
        self.assertTrue(isinstance(objects[0], manage.Vlan))
        self.assertTrue(isinstance(objects[1], manage.Prefix))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_parse_single_line_yields_columns(self):
        data = (b"room1:10.0.0.186:myorg:SW:1:public:secret:amaster:doesthings:"
                b"key=value:blah1:blah2")
        b = bulkparse.NetboxBulkParser(data)
        out_data = six.next(b)
        self.assertTrue(isinstance(out_data, dict), out_data)
        self.assertEquals(out_data['roomid'], 'room1')
        self.assertEquals(out_data['ip'], '10.0.0.186')
        self.assertEquals(out_data['orgid'], 'myorg')
        self.assertEquals(out_data['catid'], 'SW')
        self.assertEquals(out_data['master'], 'amaster')
        self.assertEquals(out_data['data'], 'key=value')
        self.assertEquals(out_data['netboxgroup'], ['blah1', 'blah2'])
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_invalid_ip_should_raise_error(self):
        data = b"room1:10.0.x.x:myorg:SW:public:parrot::\n"
        b = bulkparse.NetboxBulkParser(data)
        self.assertRaises(bulkparse.InvalidFieldValue, lambda: six.next(b))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_short_line_should_raise_error_with_correct_details(self):
        data = b"room1:10.0.0.8"
        b = bulkparse.NetboxBulkParser(data)
        try:
            six.next(b)
        except bulkparse.RequiredFieldMissing as error:
            self.assertEquals(error.line_num, 1)
            self.assertEquals(error.missing_field, 'orgid')
        else:
            self.fail("No exception raised")
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_leading_comments_should_be_stripped(self):
        data = b"#comment\nsby:student village"
        b = bulkparse.UsageBulkParser(data)
        first_row = six.next(b)
        self.assertEquals(first_row['usageid'], 'sby')
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_invalid_prefix_should_raise_error(self):
        data = b"10.0.0.x/3f:scope"
        b = bulkparse.PrefixBulkParser(data)
        self.assertRaises(bulkparse.InvalidFieldValue, lambda: six.next(b))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_invalid_service_arguments_should_raise_error(self):
        data = b"host.example.org;http;port80"
        b = bulkparse.ServiceBulkParser(data)
        self.assertRaises(bulkparse.InvalidFieldValue, lambda: six.next(b))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_valid_service_arguments_should_not_raise_error(self):
        data = b"host.example.org;http;port=80;uri=/"
        b = bulkparse.ServiceBulkParser(data)
        self.assertTrue(six.next(b))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_leading_comment_should_be_stripped(self):
        data = iter(['#leadingcomment\n', 'something\n'])
        stripper = bulkparse.CommentStripper(data)
        self.assertEquals(six.next(stripper), '\n')
        self.assertEquals(six.next(stripper), 'something\n')
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_suffix_comment_should_be_stripped(self):
        data = iter(['somedata\n', 'otherdata    # ignore this\n'])
        stripper = bulkparse.CommentStripper(data)
        self.assertEquals(six.next(stripper), 'somedata\n')
        self.assertEquals(six.next(stripper), 'otherdata\n')
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def parse_items(itemstring, sort=False):
    """Like taggit.utils.parse_tags, but without sorting"""
    if not itemstring:
        return []

    itemstring = force_text(itemstring)

    words = []
    buf = []
    # Defer splitting of non-quoted sections until we know if there are
    # any unquoted commas.
    to_be_split = []
    i = iter(itemstring)
    try:
        while True:
            c = six.next(i)
            if c == '"':
                if buf:
                    to_be_split.append(''.join(buf))
                    buf = []
                # Find the matching quote
                c = six.next(i)
                while c != '"':
                    buf.append(c)
                    c = six.next(i)
                if buf:
                    word = ''.join(buf).strip()
                    if word:
                        words.append(word)
                    buf = []
            else:
                buf.append(c)
    except StopIteration:
        # If we were parsing an open quote which was never closed treat
        # the buffer as unquoted.
        if buf:
            to_be_split.append(''.join(buf))

    if to_be_split:
        delimiter = ','

        for chunk in to_be_split:
            words.extend(split_strip(chunk, delimiter))

    if sort:
        words = list(set(words))
        words.sort()

    return words