我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用pickle.NEWOBJ。
def test_newobj_proxies(self): # NEWOBJ should use the __class__ rather than the raw type import weakref classes = myclasses[:] # Cannot create weakproxies to these classes for c in (MyInt, MyLong, MyStr, MyTuple): classes.remove(c) for proto in protocols: for C in classes: B = C.__base__ x = C(C.sample) x.foo = 42 p = weakref.proxy(x) s = self.dumps(p, proto) y = self.loads(s) self.assertEqual(type(y), type(x)) # rather than type(p) detail = (proto, C, B, x, y, type(y)) self.assertEqual(B(x), B(y), detail) self.assertEqual(x.__dict__, y.__dict__, detail) # Register a type with copy_reg, with extension code extcode. Pickle # an object of that type. Check that the resulting pickle uses opcode # (EXT[124]) under proto 2, and not in proto 1.
def test_newobj_proxies(self): # NEWOBJ should use the __class__ rather than the raw type classes = myclasses[:] # Cannot create weakproxies to these classes for c in (MyInt, MyTuple): classes.remove(c) for proto in protocols: for C in classes: B = C.__base__ x = C(C.sample) x.foo = 42 p = weakref.proxy(x) s = self.dumps(p, proto) y = self.loads(s) self.assertEqual(type(y), type(x)) # rather than type(p) detail = (proto, C, B, x, y, type(y)) self.assertEqual(B(x), B(y), detail) self.assertEqual(x.__dict__, y.__dict__, detail) # Register a type with copyreg, with extension code extcode. Pickle # an object of that type. Check that the resulting pickle uses opcode # (EXT[124]) under proto 2, and not in proto 1.
def test_newobj_proxies(self): # NEWOBJ should use the __class__ rather than the raw type classes = myclasses[:] # Cannot create weakproxies to these classes for c in (MyInt, MyTuple): classes.remove(c) for proto in protocols: for C in classes: B = C.__base__ x = C(C.sample) x.foo = 42 p = weakref.proxy(x) s = self.dumps(p, proto) y = self.loads(s) self.assertEqual(type(y), type(x)) # rather than type(p) detail = (proto, C, B, x, y, type(y)) self.assertEqual(B(x), B(y), detail) self.assertEqual(x.__dict__, y.__dict__, detail)
def test_simple_newobj(self): x = object.__new__(SimpleNewObj) # avoid __init__ x.abc = 666 for proto in protocols: s = self.dumps(x, proto) self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2) y = self.loads(s) # will raise TypeError if __init__ called self.assertEqual(y.abc, 666) self.assertEqual(x.__dict__, y.__dict__)
def test_bad_stack(self): badpickles = [ '.', # STOP '0', # POP '1', # POP_MARK '2', # DUP # '(2', # PyUnpickler doesn't raise 'R', # REDUCE ')R', 'a', # APPEND 'Na', 'b', # BUILD 'Nb', 'd', # DICT 'e', # APPENDS # '(e', # PyUnpickler raises AttributeError 'i__builtin__\nlist\n', # INST 'l', # LIST 'o', # OBJ '(o', 'p1\n', # PUT 'q\x00', # BINPUT 'r\x00\x00\x00\x00', # LONG_BINPUT 's', # SETITEM 'Ns', 'NNs', 't', # TUPLE 'u', # SETITEMS # '(u', # PyUnpickler doesn't raise '}(Nu', '\x81', # NEWOBJ ')\x81', '\x85', # TUPLE1 '\x86', # TUPLE2 'N\x86', '\x87', # TUPLE3 'N\x87', 'NN\x87', ] for p in badpickles: self.check_unpickling_error(self.bad_stack_errors, p)
def test_bad_mark(self): badpickles = [ # 'N(.', # STOP 'N(2', # DUP 'c__builtin__\nlist\n)(R', # REDUCE 'c__builtin__\nlist\n()R', ']N(a', # APPEND # BUILD 'c__builtin__\nValueError\n)R}(b', 'c__builtin__\nValueError\n)R(}b', '(Nd', # DICT 'N(p1\n', # PUT 'N(q\x00', # BINPUT 'N(r\x00\x00\x00\x00', # LONG_BINPUT '}NN(s', # SETITEM '}N(Ns', '}(NNs', '}((u', # SETITEMS # NEWOBJ 'c__builtin__\nlist\n)(\x81', 'c__builtin__\nlist\n()\x81', 'N(\x85', # TUPLE1 'NN(\x86', # TUPLE2 'N(N\x86', 'NNN(\x87', # TUPLE3 'NN(N\x87', 'N(NN\x87', ] for p in badpickles: self.check_unpickling_error(self.bad_mark_errors, p)
def test_simple_newobj(self): x = SimpleNewObj.__new__(SimpleNewObj, 0xface) # avoid __init__ x.abc = 666 for proto in protocols: s = self.dumps(x, proto) if proto < 1: self.assertIn('\nI64206', s) # INT else: self.assertIn('M\xce\xfa', s) # BININT2 self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2) y = self.loads(s) # will raise TypeError if __init__ called self.assertEqual(y.abc, 666) self.assertEqual(x.__dict__, y.__dict__)
def test_complex_newobj(self): x = ComplexNewObj.__new__(ComplexNewObj, 0xface) # avoid __init__ x.abc = 666 for proto in protocols: s = self.dumps(x, proto) if proto < 1: self.assertIn('\nI64206', s) # INT elif proto < 2: self.assertIn('M\xce\xfa', s) # BININT2 else: self.assertIn('U\x04FACE', s) # SHORT_BINSTRING self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2) y = self.loads(s) # will raise TypeError if __init__ called self.assertEqual(y.abc, 666) self.assertEqual(x.__dict__, y.__dict__)
def test_bad_mark(self): badpickles = [ # b'N(.', # STOP b'N(2', # DUP b'cbuiltins\nlist\n)(R', # REDUCE b'cbuiltins\nlist\n()R', b']N(a', # APPEND # BUILD b'cbuiltins\nValueError\n)R}(b', b'cbuiltins\nValueError\n)R(}b', b'(Nd', # DICT b'N(p1\n', # PUT b'N(q\x00', # BINPUT b'N(r\x00\x00\x00\x00', # LONG_BINPUT b'}NN(s', # SETITEM b'}N(Ns', b'}(NNs', b'}((u', # SETITEMS b'cbuiltins\nlist\n)(\x81', # NEWOBJ b'cbuiltins\nlist\n()\x81', b'N(\x85', # TUPLE1 b'NN(\x86', # TUPLE2 b'N(N\x86', b'NNN(\x87', # TUPLE3 b'NN(N\x87', b'N(NN\x87', b']((\x90', # ADDITEMS # NEWOBJ_EX b'cbuiltins\nlist\n)}(\x92', b'cbuiltins\nlist\n)(}\x92', b'cbuiltins\nlist\n()}\x92', # STACK_GLOBAL b'Vbuiltins\n(Vlist\n\x93', b'Vbuiltins\nVlist\n(\x93', b'N(\x94', # MEMOIZE ] for p in badpickles: self.check_unpickling_error(self.bad_mark_errors, p)
def test_simple_newobj(self): x = object.__new__(SimpleNewObj) # avoid __init__ x.abc = 666 for proto in protocols: s = self.dumps(x, proto) self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), 2 <= proto < 4) self.assertEqual(opcode_in_pickle(pickle.NEWOBJ_EX, s), proto >= 4) y = self.loads(s) # will raise TypeError if __init__ called self.assert_is_copy(x, y)
def save_reduce(self, func, args, state=None, listitems=None, dictitems=None, obj=None): """Modified to support __transient__ on new objects Change only affects protocol level 2 (which is always used by PiCloud""" # Assert that args is a tuple or None if not isinstance(args, tuple): raise pickle.PicklingError("args from reduce() should be a tuple") # Assert that func is callable if not hasattr(func, '__call__'): raise pickle.PicklingError("func from reduce should be callable") save = self.save write = self.write # Protocol 2 special case: if func's name is __newobj__, use NEWOBJ if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__": #Added fix to allow transient cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( "args[0] from __newobj__ args has no __new__") if obj is not None and cls is not obj.__class__: raise pickle.PicklingError( "args[0] from __newobj__ args has the wrong class") args = args[1:] save(cls) #Don't pickle transient entries if hasattr(obj, '__transient__'): transient = obj.__transient__ state = state.copy() for k in list(state.keys()): if k in transient: del state[k] save(args) write(pickle.NEWOBJ) else: save(func) save(args) write(pickle.REDUCE) if obj is not None: self.memoize(obj) # More new special cases (that work with older protocols as # well): when __reduce__ returns a tuple with 4 or 5 items, # the 4th and 5th item should be iterators that provide list # items and dict items (as (key, value) tuples), or None. if listitems is not None: self._batch_appends(listitems) if dictitems is not None: self._batch_setitems(dictitems) if state is not None: save(state) write(pickle.BUILD)
def test_bad_stack(self): badpickles = [ b'.', # STOP b'0', # POP b'1', # POP_MARK b'2', # DUP # b'(2', # PyUnpickler doesn't raise b'R', # REDUCE b')R', b'a', # APPEND b'Na', b'b', # BUILD b'Nb', b'd', # DICT b'e', # APPENDS # b'(e', # PyUnpickler raises AttributeError b'ibuiltins\nlist\n', # INST b'l', # LIST b'o', # OBJ b'(o', b'p1\n', # PUT b'q\x00', # BINPUT b'r\x00\x00\x00\x00', # LONG_BINPUT b's', # SETITEM b'Ns', b'NNs', b't', # TUPLE b'u', # SETITEMS # b'(u', # PyUnpickler doesn't raise b'}(Nu', b'\x81', # NEWOBJ b')\x81', b'\x85', # TUPLE1 b'\x86', # TUPLE2 b'N\x86', b'\x87', # TUPLE3 b'N\x87', b'NN\x87', b'\x90', # ADDITEMS # b'(\x90', # PyUnpickler raises AttributeError b'\x91', # FROZENSET b'\x92', # NEWOBJ_EX b')}\x92', b'\x93', # STACK_GLOBAL b'Vlist\n\x93', b'\x94', # MEMOIZE ] for p in badpickles: self.check_unpickling_error(self.bad_stack_errors, p)
def save_reduce(self, func, args, state=None, listitems=None, dictitems=None, obj=None): # Assert that args is a tuple or None if not isinstance(args, tuple): raise pickle.PicklingError("args from reduce() should be a tuple") # Assert that func is callable if not hasattr(func, '__call__'): raise pickle.PicklingError("func from reduce should be callable") save = self.save write = self.write # Protocol 2 special case: if func's name is __newobj__, use NEWOBJ if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__": cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( "args[0] from __newobj__ args has no __new__") if obj is not None and cls is not obj.__class__: raise pickle.PicklingError( "args[0] from __newobj__ args has the wrong class") args = args[1:] save(cls) save(args) write(pickle.NEWOBJ) else: save(func) save(args) write(pickle.REDUCE) if obj is not None: self.memoize(obj) # More new special cases (that work with older protocols as # well): when __reduce__ returns a tuple with 4 or 5 items, # the 4th and 5th item should be iterators that provide list # items and dict items (as (key, value) tuples), or None. if listitems is not None: self._batch_appends(listitems) if dictitems is not None: self._batch_setitems(dictitems) if state is not None: save(state) write(pickle.BUILD)