Python weakref 模块,WeakSet() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用weakref.WeakSet()。
def __init__(self, context, handle, device):
"""
You should not instanciate this class directly.
Call "open" method on an USBDevice instance to get an USBDeviceHandle
instance.
"""
self.__context = context
# Weak reference to transfers about this device so we can clean up
# before closing device.
self.__transfer_set = WeakSet()
# Strong references to inflight transfers so they do not get freed
# even if user drops all strong references to them. If this instance
# is garbage-collected, we close all transfers, so it's fine.
self.__inflight = inflight = set()
# XXX: For some reason, doing self.__inflight.{add|remove} inside
# getTransfer causes extra intermediate python objects for each
# allocated transfer. Storing them as properties solves this. Found
# with objgraph.
self.__inflight_add = inflight.add
self.__inflight_remove = inflight.remove
self.__handle = handle
self.__device = device
def addDataSource(self, source=SampleDataSource, name='default', sourceModule=None, sourceArgs=[], sourceKwargs={}):
if name in self._sources:
raise Exception('Data source "%s" already exists!' % name)
if utils.isstr(source):
if utils.isstr(sourceModule):
sourceModule = __import__(sourceModule)
if sourceModule is not None:
source = sourceModule.__dict__[source]
elif type(source) in [ types.ClassType, types.TypeType]:
source = source(*sourceArgs, **sourceKwargs)
cds = weakref.WeakSet()
self._sources[name] = (source, cds)
self._lastSamples[name] = source.initialSamples()
for cd in list(self._lostCurveDatas):
if self._tryAddToDataSource(cd, name):
self._lostCurveDatas.remote(cds)
def test_sub_and_super(self):
pl, ql, rl = map(lambda s: [ustr(c) for c in s], ['ab', 'abcde', 'def'])
p, q, r = map(WeakSet, (pl, ql, rl))
self.assertTrue(p < q)
self.assertTrue(p <= q)
self.assertTrue(q <= q)
self.assertTrue(q > p)
self.assertTrue(q >= p)
self.assertFalse(q < r)
self.assertFalse(q <= r)
self.assertFalse(q > r)
self.assertFalse(q >= r)
self.assertTrue(set('a').issubset('abc'))
self.assertTrue(set('abc').issuperset('a'))
self.assertFalse(set('a').issubset('cbs'))
self.assertFalse(set('cbs').issuperset('a'))
def __init__(self, *, log_level='ERROR'):
self.log_level = log_level
self.udp_sock = None
self._search_lock = threading.RLock()
self.search_results = {} # map name to (time, address)
self.unanswered_searches = {} # map search id (cid) to name
self.listeners = weakref.WeakSet()
self.broadcaster = ca.Broadcaster(our_role=ca.CLIENT)
self.broadcaster.log.setLevel(self.log_level)
self.command_bundle_queue = queue.Queue()
self.command_cond = threading.Condition()
self.selector = SelectorThread()
self.command_thread = threading.Thread(target=self.command_loop,
daemon=True)
self.command_thread.start()
def __new__(cls, value, variable=None, keywords=None):
"""Return an already existing :class:`Node` instance or, if such
an instance does not exist yet, return a newly created one.
"""
name = str(value)
if name not in cls._registry:
self = object.__new__(Node)
self._check_name(name)
self._name = name
if variable is None:
self._variable = self._predefinedvariable
else:
self._variable = variable
self._keywords = Keywords()
self._keywords.device = self
self.entries = connectiontools.Connections(self)
self.exits = connectiontools.Connections(self)
self.sequences = sequencetools.NodeSequences(self)
self.deploy_mode = 'newsim'
self._blackhole = None
self._handlers = weakref.WeakSet()
cls._registry[name] = self
cls._selection[name] = cls._registry[name]
return cls._registry[name]
def __new__(cls, value, inlets=None, outlets=None,
receivers=None, senders=None, keywords=None):
"""Return an already existing :class:`Element` instance or, if such
an instance does not exist yet, a new newly created one.
"""
name = str(value)
if name not in cls._registry:
self = object.__new__(Element)
self._check_name(name)
self._name = name
self.inlets = connectiontools.Connections(self)
self.outlets = connectiontools.Connections(self)
self.receivers = connectiontools.Connections(self)
self.senders = connectiontools. Connections(self)
self._keywords = Keywords()
self._keywords.device = self
self.model = None
self._handlers = weakref.WeakSet()
cls._registry[name] = self
cls._selection[name] = cls._registry[name]
return cls._registry[name]
def test_union(self):
u = self.s.union(self.items2)
for c in self.letters:
self.assertEqual(c in u, c in self.d or c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(u), WeakSet)
self.assertRaises(TypeError, self.s.union, [[]])
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
del c
self.assertEqual(len(u), len(self.items) + len(self.items2))
self.items2.pop()
gc.collect()
self.assertEqual(len(u), len(self.items) + len(self.items2))
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
gc.collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_len_race(self):
# Extended sanity checks for len() in the face of cyclic collection
self.addCleanup(gc.set_threshold, *gc.get_threshold())
for th in range(1, 100):
N = 20
gc.collect(0)
gc.set_threshold(th, th, th)
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
# All items will be collected at next garbage collection pass
it = iter(s)
try:
next(it)
except StopIteration:
pass
n1 = len(s)
del it
n2 = len(s)
self.assertGreaterEqual(n1, 0)
self.assertLessEqual(n1, N)
self.assertGreaterEqual(n2, 0)
self.assertLessEqual(n2, n1)
def test_union(self):
u = self.s.union(self.items2)
for c in self.letters:
self.assertEqual(c in u, c in self.d or c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(u), WeakSet)
self.assertRaises(TypeError, self.s.union, [[]])
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
del c
self.assertEqual(len(u), len(self.items) + len(self.items2))
self.items2.pop()
gc.collect()
self.assertEqual(len(u), len(self.items) + len(self.items2))
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
gc.collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_len_race(self):
# Extended sanity checks for len() in the face of cyclic collection
self.addCleanup(gc.set_threshold, *gc.get_threshold())
for th in range(1, 100):
N = 20
gc.collect(0)
gc.set_threshold(th, th, th)
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
# All items will be collected at next garbage collection pass
it = iter(s)
try:
next(it)
except StopIteration:
pass
n1 = len(s)
del it
n2 = len(s)
self.assertGreaterEqual(n1, 0)
self.assertLessEqual(n1, N)
self.assertGreaterEqual(n2, 0)
self.assertLessEqual(n2, n1)
def test_union(self):
u = self.s.union(self.items2)
for c in self.letters:
self.assertEqual(c in u, c in self.d or c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(u), WeakSet)
self.assertRaises(TypeError, self.s.union, [[]])
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
del c
self.assertEqual(len(u), len(self.items) + len(self.items2))
self.items2.pop()
gc.collect()
self.assertEqual(len(u), len(self.items) + len(self.items2))
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
gc.collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_len_race(self):
# Extended sanity checks for len() in the face of cyclic collection
self.addCleanup(gc.set_threshold, *gc.get_threshold())
for th in range(1, 100):
N = 20
gc.collect(0)
gc.set_threshold(th, th, th)
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
# All items will be collected at next garbage collection pass
it = iter(s)
try:
next(it)
except StopIteration:
pass
n1 = len(s)
del it
n2 = len(s)
self.assertGreaterEqual(n1, 0)
self.assertLessEqual(n1, N)
self.assertGreaterEqual(n2, 0)
self.assertLessEqual(n2, n1)
def test_union(self):
u = self.s.union(self.items2)
for c in self.letters:
self.assertEqual(c in u, c in self.d or c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(u), WeakSet)
self.assertRaises(TypeError, self.s.union, [[]])
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
del c
self.assertEqual(len(u), len(self.items) + len(self.items2))
self.items2.pop()
gc.collect()
self.assertEqual(len(u), len(self.items) + len(self.items2))
def test_weak_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
# Create new items to be sure no-one else holds a reference
items = [SomeClass(c) for c in ('a', 'b', 'c')]
s = WeakSet(items)
it = iter(s)
next(it) # Trigger internal iteration
# Destroy an item
del items[-1]
test_support.gc_collect()
# We have removed either the first consumed items, or another one
self.assertIn(len(list(it)), [len(items), len(items) - 1])
del it
test_support.gc_collect()
# The removal has been committed
self.assertEqual(len(s), len(items))
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
test_support.gc_collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_len_race(self):
# Extended sanity checks for len() in the face of cyclic collection
#self.addCleanup(gc.set_threshold, *gc.get_threshold())
for th in range(1, 100):
N = 20
test_support.gc_collect()
#gc.set_threshold(th, th, th)
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
# All items will be collected at next garbage collection pass
it = iter(s)
try:
next(it)
except StopIteration:
pass
n1 = len(s)
del it
n2 = len(s)
self.assertGreaterEqual(n1, 0)
self.assertLessEqual(n1, N)
self.assertGreaterEqual(n2, 0)
self.assertLessEqual(n2, n1)
def deserializeToObject(cls, serializedObject, parentObject):
"""see Score.deserializeToObject"""
assert cls.__name__ == serializedObject["class"]
self = cls.__new__(cls)
if serializedObject["data"] is None: #Found a content linked block which already has one member of its group in the score
firstBlock = Block.firstBlockWithNewContentDuringDeserializeToObject[serializedObject["contentLinkGroup"]] #block with the same contentGroup. This is the one with the real data.
self.data = firstBlock.data
self._minimumInTicks = firstBlock._minimumInTicks
self.linkedContentBlocks = firstBlock.linkedContentBlocks #add self to this is in _secondInit
else: #found a stand-alone block or the first one of a content link group
self.linkedContentBlocks = WeakSet()
self.data = [eval(item["class"]).deserializeToObject(item, parentObject = self) for item in serializedObject["data"]]
Block.firstBlockWithNewContentDuringDeserializeToObject[serializedObject["contentLinkGroup"]] = self
for item in self.data:
item.parentBlocks.add(self)
self.name = serializedObject["name"]
self._minimumInTicks = [int(serializedObject["minimumInTicks"])] #saved as int, used as list
self._secondInit(parentTrack = parentObject)
return self
def deserializeToObject(cls, serializedObject, parentObject):
"""see Score.deserializeToObject"""
assert cls.__name__ == serializedObject["class"]
self = cls.__new__(cls)
self.parentGraphTrack = parentObject
if serializedObject["data"] is None:
firstBlock = GraphBlock.firstBlockWithNewContentDuringDeserializeToObject[serializedObject["contentLinkGroup"]] #first block with the same contentGroup. This is the one with the real data.
self.data = firstBlock.data
self._duration = firstBlock._duration
self.linkedContentBlocks = firstBlock.linkedContentBlocks #add self to this is in _secondInit
else: #Standalone or First occurence of a content linked block
self.data = {int(position):GraphItem.deserializeToObject(item, parentObject = self) for position, item in serializedObject["data"].items()}
GraphBlock.firstBlockWithNewContentDuringDeserializeToObject[serializedObject["contentLinkGroup"]] = self
self.linkedContentBlocks = WeakSet()
self._duration = [int(serializedObject["duration"])] #this is just an int, minimumBlockInTicks or so. Not Item.Duration(). For save and load we drop the list as mutable value.
self.name = serializedObject["name"]
self._secondInit(parentObject)
return self
def deserializeToObject(cls, serializedObject, parentObject):
"""see Score.deserializeToObject."""
#TODO: this is nearly the same as the GraphBlock method with the same name. During development it made sense to write to different functions. But this can be simplified by calling super().deserializeToObject
assert cls.__name__ == serializedObject["class"]
self = cls.__new__(cls)
if serializedObject["data"] is None:
firstBlock = TempoBlock.firstBlockWithNewContentDuringDeserializeToObject[serializedObject["contentLinkGroup"]] #first block with the same contentGroup. This is the one with the real data.
self.data = firstBlock.data
self._duration = firstBlock._duration
self.linkedContentBlocks = firstBlock.linkedContentBlocks #add self to this is in _secondInit
else: #Standalone or First occurence of a content linked block
self.data = {int(position):eval(item["class"]).deserializeToObject(item, parentObject = self) for position, item in serializedObject["data"].items()}
TempoBlock.firstBlockWithNewContentDuringDeserializeToObject[serializedObject["contentLinkGroup"]] = self
self._duration = [int(serializedObject["duration"])] #this is just an int, minimumBlockInTicks or so. Not Item.Duration(). For save and load we drop the list as mutable value.
self.linkedContentBlocks = WeakSet()
#No super() deserialize call here so we need to create all objects directly:
self.name = serializedObject["name"]
self._secondInit(parentObject)
return self
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
gc.collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_union(self):
u = self.s.union(self.items2)
for c in self.letters:
self.assertEqual(c in u, c in self.d or c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(u), WeakSet)
self.assertRaises(TypeError, self.s.union, [[]])
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
del c
self.assertEqual(len(u), len(self.items) + len(self.items2))
self.items2.pop()
gc.collect()
self.assertEqual(len(u), len(self.items) + len(self.items2))
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
gc.collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_len_race(self):
# Extended sanity checks for len() in the face of cyclic collection
self.addCleanup(gc.set_threshold, *gc.get_threshold())
for th in range(1, 100):
N = 20
gc.collect(0)
gc.set_threshold(th, th, th)
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
# All items will be collected at next garbage collection pass
it = iter(s)
try:
next(it)
except StopIteration:
pass
n1 = len(s)
del it
n2 = len(s)
self.assertGreaterEqual(n1, 0)
self.assertLessEqual(n1, N)
self.assertGreaterEqual(n2, 0)
self.assertLessEqual(n2, n1)
def on(event, callback):
"""Call `callback` on `event`
Register `callback` to be run when `event` occurs.
Example:
>>> def on_init():
... print("Init happened")
...
>>> on("init", on_init)
>>> del on_init
Arguments:
event (str): Name of event
callback (callable): Any callable
"""
if event not in _registered_event_handlers:
_registered_event_handlers[event] = weakref.WeakSet()
events = _registered_event_handlers[event]
events.add(callback)
def test_len_cycles(self):
N = 20
items = [RefCycle() for i in range(N)]
s = WeakSet(items)
del items
it = iter(s)
try:
next(it)
except StopIteration:
pass
gc.collect()
n1 = len(s)
del it
gc.collect()
n2 = len(s)
# one item may be kept alive inside the iterator
self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def __init__(self, worker_factory=WorkerThread):
self.__listeners = defaultdict(weakref.WeakSet)
self.__listener_lock = RWLock()
self.__workers = {}
for worker_name in event_ids.EVENT_THREAD_NAMES:
if worker_name != event_ids.EVENT_THREAD__NOW:
self.__workers[worker_name] = worker_factory(worker_name)
def __init__(self, name, *args, **kwargs):
# type: (str, *Any, **Any) -> None
super(ModelEvent, self).__init__(name, **kwargs)
self._kwargs = kwargs
self._kwargs.pop('app', None) # don't use app in __reduce__
self._filterargs = args
self.models = WeakSet()
# initialize the filter fields: {field}__{op}
self.filter_fields = {
k: v for k, v in items(kwargs) if '__' in k
}
# optimization: Django: Transition operators require the unchanged
# database fields before saving, a pre_save signal
# handles this, but we want to avoid the extra database hit
# when they are not in use.
self.use_transitions = any(
'__now_' in k for k in keys(self.filter_fields),
)
# _filterargs is set by __reduce__ to restore *args
restored_args = kwargs.get('_filterargs') or ()
self._init_attrs(**kwargs)
self._filter_predicate = (
Q(*args + restored_args, **self.filter_fields)
if args or self.filter_fields else _true)
def setUp(self):
# WeakSet is Python 2.7+
if not hasattr(weakref, 'WeakSet'):
self.skipTest('No weakref.WeakSet available')
BaseTestCase.setUp(self)
def getReference(self, obj):
return weakref.WeakSet(obj)
def __init__(self, context, device_p, can_load_configuration=True):
"""
You should not instanciate this class directly.
Call USBContext methods to receive instances of this class.
"""
self.__context = context
self.__close_set = WeakSet()
libusb1.libusb_ref_device(device_p)
self.device_p = device_p
# Fetch device descriptor
device_descriptor = libusb1.libusb_device_descriptor()
result = libusb1.libusb_get_device_descriptor(
device_p, byref(device_descriptor))
mayRaiseUSBError(result)
self.device_descriptor = device_descriptor
if can_load_configuration:
self.__configuration_descriptor_list = descriptor_list = []
append = descriptor_list.append
device_p = self.device_p
for configuration_id in xrange(
self.device_descriptor.bNumConfigurations):
config = libusb1.libusb_config_descriptor_p()
result = libusb1.libusb_get_config_descriptor(
device_p, configuration_id, byref(config))
# pylint: disable=undefined-variable
if result == ERROR_NOT_FOUND:
# pylint: enable=undefined-variable
# Some devices (ex windows' root hubs) tell they have
# one configuration, but they have no configuration
# descriptor.
continue
mayRaiseUSBError(result)
append(config.contents)
def __init__(self):
"""
Create a new USB context.
"""
# Used to prevent an exit to cause a segfault if a concurrent thread
# is still in libusb.
self.__context_refcount = 0
self.__context_cond = threading.Condition()
self.__context_p = libusb1.libusb_context_p()
self.__hotplug_callback_dict = {}
self.__close_set = WeakSet()
def __init__(self, address):
self._address = address
self._free_instances = weakref.WeakSet()
# initialize the pipe attribute before calling _server_pipe_handle()
# because this function can raise an exception and the destructor calls
# the close() method
self._pipe = None
self._accept_pipe_future = None
self._pipe = self._server_pipe_handle(True)
def __init__(self, concurrency=0xffffffff):
self._loop = None
self._results = []
self._iocp = _overlapped.CreateIoCompletionPort(
_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
self._cache = {}
self._registered = weakref.WeakSet()
self._unregistered = []
self._stopped_serving = weakref.WeakSet()
def __init__(self, protocol):
self.protocol = protocol
self.calls = WeakSet()
self.loops = WeakSet()
def reset(self):
for call in self.calls:
if call.active():
call.cancel()
for loop in self.loops:
if loop.running:
loop.stop()
self.calls = WeakSet()
self.loops = WeakSet()
def __init__(self):
self._functions = WeakSet()
self._methods = WeakKeyDictionary()
def __init__(self, curveCls, numpoints=10000, **kwargs):
super(DataManager, self).__init__()
self._numpoints = numpoints
self._lostCurveDatas = weakref.WeakSet()
self._curveCls = curveCls
self._lastSamples = collections.OrderedDict()
self._sources = collections.OrderedDict()
def __init__(self, parent_hash=ZERO_HASH, unlocked_block_storage={}, did_lock_to_index_f=None):
self.parent_hash = parent_hash
self.hash_to_index_lookup = {}
self.weight_lookup = {}
self.chain_finder = ChainFinder()
self.change_callbacks = weakref.WeakSet()
self._longest_chain_cache = None
self.did_lock_to_index_f = did_lock_to_index_f
self.unlocked_block_storage = unlocked_block_storage
self._locked_chain = []
def setUp(self):
# need to keep references to them
self.items = [ustr(c) for c in ('a', 'b', 'c')]
self.items2 = [ustr(c) for c in ('x', 'y', 'z')]
self.letters = [ustr(c) for c in string.ascii_letters]
self.s = WeakSet(self.items)
self.d = dict.fromkeys(self.items)
self.obj = ustr('F')
self.fs = WeakSet([self.obj])
def test_methods(self):
weaksetmethods = dir(WeakSet)
for method in dir(set):
if method == 'test_c_api' or method.startswith('_'):
continue
self.assertIn(method, weaksetmethods,
"WeakSet missing method " + method)
def test_new_or_init(self):
self.assertRaises(TypeError, WeakSet, [], 2)
def test_union(self):
u = self.s.union(self.items2)
for c in self.letters:
self.assertEqual(c in u, c in self.d or c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(u), WeakSet)
self.assertRaises(TypeError, self.s.union, [[]])
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
def test_intersection(self):
i = self.s.intersection(self.items2)
for c in self.letters:
self.assertEqual(c in i, c in self.d and c in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(i), WeakSet)
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet([])
self.assertEqual(self.s.intersection(C(self.items2)), x)
def test_difference(self):
i = self.s.difference(self.items2)
for c in self.letters:
self.assertEqual(c in i, c in self.d and c not in self.items2)
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(i), WeakSet)
self.assertRaises(TypeError, self.s.difference, [[]])
def test_symmetric_difference(self):
i = self.s.symmetric_difference(self.items2)
for c in self.letters:
self.assertEqual(c in i, (c in self.d) ^ (c in self.items2))
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(i), WeakSet)
self.assertRaises(TypeError, self.s.symmetric_difference, [[]])