Python bson 模块,loads() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用bson.loads()

项目:henet    作者:AcrDijon    | 项目源码 | 文件源码
def parse_article(path, cache_dir, root_path):
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)

    # XXX two reads...
    with open(path) as f:
        file_md5 = md5(f.read())

    cache = os.path.join(cache_dir, file_md5)

    if os.path.exists(cache):
        with open(cache) as f:
            article = Article(bson.loads(f.read()))
    else:
        article = parse(path)
        article['filename'] = path[len(root_path):].lstrip('/')
        with open(cache, 'w') as f:
            f.write(bson.dumps(article))

    return article
项目:baka    作者:baka-framework    | 项目源码 | 文件源码
def json_loads(s):
    return json.loads(s, parse_float=Decimal)
项目:baka    作者:baka-framework    | 项目源码 | 文件源码
def bson_loads(s):
    return bson.loads(s)
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def get_details(self):
        url = self.interpolate_url()

        resp = self.api_client.get(url, op='details')

        if resp.status_code == 200:
            detail_config = bson.loads(resp.text)
            return detail_config
项目:serialize    作者:hgrecco    | 项目源码 | 文件源码
def loads(content):
    obj = all.traverse_and_decode(bson.loads(content))
    return obj.get('__bson_follow__', obj)
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def __init__(self, custom_codec_implementation=None):
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            # Use implementation from pymongo or from pybson
            import bson
            if hasattr(bson, 'BSON'):
                # pymongo
                self._loads = lambda raw: bson.BSON.decode(bson.BSON(raw))
                self._dumps = lambda msg: bytes(bson.BSON.encode(msg))
            else:
                # pybson
                self._loads = bson.loads
                self._dumps = bson.dumps
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def loads(self, b_msg):
        try:
            return self._loads(b_msg)
        except Exception as e:
            raise DecodingError(e)
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def __init__(self, extractor, framer, custom_codec_implementation=None):
        self._extractor = extractor
        self._framer = framer
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            import json
            self._loads = json.loads
            self._dumps = json.dumps
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def loads(self, b_msg):
        try:
            return self._loads(b_msg.decode('utf-8'))
        except Exception as e:
            raise DecodingError(e)
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def _to_queue(self, bbuffer):
        b_msg, bbuffer = self.codec.extract_message(bbuffer)
        while b_msg is not None:
            self._queue.put(self.codec.loads(b_msg))
            b_msg, bbuffer = self.codec.extract_message(bbuffer)
        return bbuffer
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def loads(self, b_msg):
        try:
            return self._loads(b_msg)
        except Exception as e:
            raise DecodingError(e)
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def __init__(self, extractor, framer, custom_codec_implementation=None):
        self._extractor = extractor
        self._framer = framer
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            import json
            self._loads = json.loads
            self._dumps = json.dumps
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def loads(self, b_msg):
        try:
            return self._loads(b_msg.decode('utf-8'))
        except Exception as e:
            raise DecodingError(e)
项目:py-bson-rpc    作者:seprich    | 项目源码 | 文件源码
def _to_queue(self, bbuffer):
        b_msg, bbuffer = self.codec.extract_message(bbuffer)
        while b_msg is not None:
            self._queue.put(self.codec.loads(b_msg))
            b_msg, bbuffer = self.codec.extract_message(bbuffer)
        return bbuffer
项目:pcc    作者:Mondego    | 项目源码 | 文件源码
def ParseFromString(self, str_value):
        self.ParseFromDict(bson.loads(str_value))