我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用multiprocessing.managers()。
def cleanup(): global __multiprocessing, __manager, __closing if __multiprocessing and __closing: log.debug("Shutting down process handler") try: __closing.set(True) except (IOError, EOFError): log.debug("Connection to manager lost during cleanup") # Only managers that were started via ".start()" implement a "shutdown". # Managers started via ".connect" may skip this. if hasattr(__manager, "shutdown"): # wait for the spawner and the worker threads to go down time.sleep(2.5) # __manager.shutdown() time.sleep(0.1) # check if it is still alive and kill it if necessary if __manager._process.is_alive(): __manager._process.terminate() __manager = None __closing = None __multiprocessing = None
def make_server_manager(port, authorization_key): """ Create a manager for the server, listening on the given port. Return a manager object with get_job_q and get_result_q methods. Arguments: port -- Port to use for communication authorization_key -- program secret used to identify correct server Returns: Manager to process jobs """ job_queue = Queue() result_queue = Queue() class JobQueueManager(multiprocessing.managers.SyncManager): pass JobQueueManager.register(JOB_QUEUE_NAME, callable=lambda: job_queue) JobQueueManager.register(RES_QUEUE_NAME, callable=lambda: result_queue) manager = JobQueueManager(address=('', port), authkey=authorization_key) return manager
def test_import(self): modules = [ 'multiprocessing', 'multiprocessing.connection', 'multiprocessing.heap', 'multiprocessing.managers', 'multiprocessing.pool', 'multiprocessing.process', 'multiprocessing.synchronize', 'multiprocessing.util' ] if HAS_REDUCTION: modules.append('multiprocessing.reduction') if c_int is not None: # This module requires _ctypes modules.append('multiprocessing.sharedctypes') for name in modules: __import__(name) mod = sys.modules[name] for attr in getattr(mod, '__all__', ()): self.assertTrue( hasattr(mod, attr), '%r does not have attribute %r' % (mod, attr) ) # # Quick test that logging works -- does not test logging output #
def test_manager_initializer(self): m = multiprocessing.managers.SyncManager() self.assertRaises(TypeError, m.start, 1) m.start(initializer, (self.ns,)) self.assertEqual(self.ns.test, 1) m.shutdown()
def process_manager(self): if self._process_manager is None: self._process_manager = multiprocessing.managers.SyncManager() self._process_manager.start(_manager_initializer) return self._process_manager
def test_manager_initializer(self): m = multiprocessing.managers.SyncManager() self.assertRaises(TypeError, m.start, 1) m.start(initializer, (self.ns,)) self.assertEqual(self.ns.test, 1) m.shutdown() m.join()