一尘不染

用Python(和Java)最快打包数据

java

有时我们的主机是错误的;纳秒级很重要;)

我有一个Python
Twisted服务器,该服务器与一些Java服务器进行通信,分析显示将其运行时的大约30%用于JSON编码器/解码器;它的工作是每秒处理数千条消息。

youtube的讲话提出了有趣的适用要点:

  • 序列化格式-无论您使用哪种格式,它们都非常昂贵。测量。不要用泡菜 不是一个好选择。找到的协议缓冲区很慢。他们编写了自己的BSON实现,比您可以下载的实现快10-15倍。

  • 你必须测量。Vitess换出了一个协议来进行HTTP实现。即使在C语言中,它的速度也很慢。因此,他们剥离了HTTP并使用python进行了直接套接字调用,这在全局CPU上便宜了8%。HTTP的封装确实非常昂贵。

  • 测量。在Python中,测量就像阅读茶叶一样。Python中有很多与直观相反的东西,例如抓斗成本。他们的大多数应用程序都花时间进行序列化。对序列化进行概要分析非常取决于您要插入的内容。对int进行序列化与对大blob进行序列化有很大不同。

无论如何,我可以控制消息传递API的Python和Java端,并且可以选择与JSON不同的序列化。

我的讯息如下:

  • 可变数量的多头;其中1到10K之间的任何位置
  • 和两个已经是UTF8的文本字符串;介于1和3KB之间

因为我正在从套接字读取它们,所以我希望库可以优雅地处理流-例如,如果它不告诉我它消耗了多少缓冲区,将很烦人。

当然,该流的另一端是Java服务器。我不想选择对Python端有用的东西,但将问题移到Java端,例如性能,折磨或不稳定的API。

我显然会进行自己的分析。我在这里问,希望您能描述我不会想到的方法,例如使用struct什么以及最快的字符串/缓冲区类型。

一些简单的测试代码会给出令人惊讶的结果:

import time, random, struct, json, sys, pickle, cPickle, marshal, array

def encode_json_1(*args):
    return json.dumps(args)

def encode_json_2(longs,str1,str2):
    return json.dumps({"longs":longs,"str1":str1,"str2":str2})

def encode_pickle(*args):
    return pickle.dumps(args)

def encode_cPickle(*args):
    return cPickle.dumps(args)

def encode_marshal(*args):
    return marshal.dumps(args)

def encode_struct_1(longs,str1,str2):
    return struct.pack(">iii%dq"%len(longs),len(longs),len(str1),len(str2),*longs)+str1+str2

def decode_struct_1(s):
    i, j, k = struct.unpack(">iii",s[:12])
    assert len(s) == 3*4 + 8*i + j + k, (len(s),3*4 + 8*i + j + k)
    longs = struct.unpack(">%dq"%i,s[12:12+i*8])
    str1 = s[12+i*8:12+i*8+j]
    str2 = s[12+i*8+j:]
    return (longs,str1,str2)

struct_header_2 = struct.Struct(">iii")

def encode_struct_2(longs,str1,str2):
    return "".join((
        struct_header_2.pack(len(longs),len(str1),len(str2)),
        array.array("L",longs).tostring(),
        str1,
        str2))

def decode_struct_2(s):
    i, j, k = struct_header_2.unpack(s[:12])
    assert len(s) == 3*4 + 8*i + j + k, (len(s),3*4 + 8*i + j + k)
    longs = array.array("L")
    longs.fromstring(s[12:12+i*8])
    str1 = s[12+i*8:12+i*8+j]
    str2 = s[12+i*8+j:]
    return (longs,str1,str2)

def encode_ujson(*args):
    return ujson.dumps(args)

def encode_msgpack(*args):
    return msgpacker.pack(args)

def decode_msgpack(s):
    msgunpacker.feed(s)
    return msgunpacker.unpack()

def encode_bson(longs,str1,str2):
    return bson.dumps({"longs":longs,"str1":str1,"str2":str2})

def from_dict(d):
    return [d["longs"],d["str1"],d["str2"]]

tests = [ #(encode,decode,massage_for_check)
    (encode_struct_1,decode_struct_1,None),
    (encode_struct_2,decode_struct_2,None),
    (encode_json_1,json.loads,None),
    (encode_json_2,json.loads,from_dict),
    (encode_pickle,pickle.loads,None),
    (encode_cPickle,cPickle.loads,None),
    (encode_marshal,marshal.loads,None)]

try:
    import ujson
    tests.append((encode_ujson,ujson.loads,None))
except ImportError:
    print "no ujson support installed"

try:
    import msgpack
    msgpacker = msgpack.Packer()
    msgunpacker = msgpack.Unpacker()
    tests.append((encode_msgpack,decode_msgpack,None))
except ImportError:
    print "no msgpack support installed"

try:
    import bson
    tests.append((encode_bson,bson.loads,from_dict))
except ImportError:
    print "no BSON support installed"

longs = [i for i in xrange(10000)]
str1 = "1"*5000
str2 = "2"*5000

random.seed(1)
encode_data = [[
        longs[:random.randint(2,len(longs))],
        str1[:random.randint(2,len(str1))],
        str2[:random.randint(2,len(str2))]] for i in xrange(1000)]

for encoder,decoder,massage_before_check in tests:
    # do the encoding
    start = time.time()
    encoded = [encoder(i,j,k) for i,j,k in encode_data]
    encoding = time.time()
    print encoder.__name__, "encoding took %0.4f,"%(encoding-start),
    sys.stdout.flush()
    # do the decoding
    decoded = [decoder(e) for e in encoded]
    decoding = time.time()
    print "decoding %0.4f"%(decoding-encoding)
    sys.stdout.flush()
    # check it
    if massage_before_check:
        decoded = [massage_before_check(d) for d in decoded]
    for i,((longs_a,str1_a,str2_a),(longs_b,str1_b,str2_b)) in enumerate(zip(encode_data,decoded)):
        assert longs_a == list(longs_b), (i,longs_a,longs_b)
        assert str1_a == str1_b, (i,str1_a,str1_b)
        assert str2_a == str2_b, (i,str2_a,str2_b)

给出:

encode_struct_1 encoding took 0.4486, decoding 0.3313
encode_struct_2 encoding took 0.3202, decoding 0.1082
encode_json_1 encoding took 0.6333, decoding 0.6718
encode_json_2 encoding took 0.5740, decoding 0.8362
encode_pickle encoding took 8.1587, decoding 9.5980
encode_cPickle encoding took 1.1246, decoding 1.4436
encode_marshal encoding took 0.1144, decoding 0.3541
encode_ujson encoding took 0.2768, decoding 0.4773
encode_msgpack encoding took 0.1386, decoding 0.2374
encode_bson encoding took 55.5861, decoding 29.3953

bsonmsgpackujson都通过easy_install安装

我会 要显示我错了,这样做; 我应该使用cStringIO接口,否则您可以加快速度!

是否必须有一种方法可以肯定地将数据序列化一个数量级?


阅读 280

收藏
2020-12-03

共1个答案

一尘不染

最后,我们选择使用msgpack。

如果使用JSON,那么选择Python和Java库对于性能至关重要:

在Java上,http//blog.juicehub.com/2012/11/20/benchmarking-web-frameworks-for-
games/说:

在我们将JSON Lib(简单的json)换成Jackon的ObjectMapper之前,性能绝对糟糕。这使RPS达到35到300+ –增长了10倍

2020-12-03