我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用multiprocessing.connection()。
def startEventLoop(name, port, authkey, ppid, debug=False): if debug: import os cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' % (os.getpid(), port, repr(authkey)), -1) conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) if debug: cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1) global HANDLER #ppid = 0 if not hasattr(os, 'getppid') else os.getppid() HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug) while True: try: HANDLER.processRequests() # exception raised when the loop should exit time.sleep(0.01) except ClosedError: break
def startQtEventLoop(name, port, authkey, ppid, debug=False): if debug: import os cprint.cout(debug, '[%d] connecting to server at port localhost:%d, authkey=%s..\n' % (os.getpid(), port, repr(authkey)), -1) conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) if debug: cprint.cout(debug, '[%d] connected; starting remote proxy.\n' % os.getpid(), -1) from ..Qt import QtGui, QtCore app = QtGui.QApplication.instance() #print app if app is None: app = QtGui.QApplication([]) app.setQuitOnLastWindowClosed(False) ## generally we want the event loop to stay open ## until it is explicitly closed by the parent process. global HANDLER HANDLER = RemoteQtEventHandler(conn, name, ppid, debug=debug) HANDLER.startEventTimer() app.exec_()
def test_spawn_close(self): # We test that a pipe connection can be closed by parent # process immediately after child is spawned. On Windows this # would have sometimes failed on old versions because # child_conn would be closed before the child got a chance to # duplicate it. conn, child_conn = self.Pipe() p = self.Process(target=self._echo, args=(child_conn,)) p.daemon = True p.start() child_conn.close() # this might complete before child initializes msg = latin('hello') conn.send_bytes(msg) self.assertEqual(conn.recv_bytes(), msg) conn.send_bytes(SENTINEL) conn.close() p.join()
def test_issue14725(self): l = self.connection.Listener() p = self.Process(target=self._test, args=(l.address,)) p.daemon = True p.start() time.sleep(1) # On Windows the client process should by now have connected, # written data and closed the pipe handle by now. This causes # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue # 14725. conn = l.accept() self.assertEqual(conn.recv(), 'hello') conn.close() p.join() l.close() # # Test of sending connection and socket objects between processes #
def test_answer_challenge_auth_failure(self): class _FakeConnection(object): def __init__(self): self.count = 0 def recv_bytes(self, size): self.count += 1 if self.count == 1: return multiprocessing.connection.CHALLENGE elif self.count == 2: return b'something bogus' return b'' def send_bytes(self, data): pass self.assertRaises(multiprocessing.AuthenticationError, multiprocessing.connection.answer_challenge, _FakeConnection(), b'abc') # # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 #
def test_timeout(self): old_timeout = socket.getdefaulttimeout() try: socket.setdefaulttimeout(0.1) parent, child = multiprocessing.Pipe(duplex=True) l = multiprocessing.connection.Listener(family='AF_INET') p = multiprocessing.Process(target=self._test_timeout, args=(child, l.address)) p.start() child.close() self.assertEqual(parent.recv(), 123) parent.close() conn = l.accept() self.assertEqual(conn.recv(), 456) conn.close() l.close() p.join(10) finally: socket.setdefaulttimeout(old_timeout) # # Test what happens with no "if __name__ == '__main__'" #
def test_dont_merge(self): a, b = self.Pipe() self.assertEqual(a.poll(0.0), False) self.assertEqual(a.poll(0.1), False) p = self.Process(target=self._child_dont_merge, args=(b,)) p.start() self.assertEqual(a.recv_bytes(), b'a') self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(1.0), True) self.assertEqual(a.recv_bytes(), b'b') self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(0.0), True) self.assertEqual(a.recv_bytes(), b'cd') p.join() # # Test of sending connection and socket objects between processes #
def _listener(cls, conn, families): for fam in families: l = cls.connection.Listener(family=fam) conn.send(l.address) new_conn = l.accept() conn.send(new_conn) new_conn.close() l.close() l = socket.socket() l.bind((test.support.HOST, 0)) l.listen(1) conn.send(l.getsockname()) new_conn, addr = l.accept() conn.send(new_conn) new_conn.close() l.close() conn.recv()
def test_ignore_listener(self): conn, child_conn = multiprocessing.Pipe() try: p = multiprocessing.Process(target=self._test_ignore_listener, args=(child_conn,)) p.daemon = True p.start() child_conn.close() address = conn.recv() time.sleep(0.1) os.kill(p.pid, signal.SIGUSR1) time.sleep(0.1) client = multiprocessing.connection.Client(address) self.assertEqual(client.recv(), 'welcome') p.join() finally: conn.close()
def test_rapid_restart(self): authkey = os.urandom(32) manager = QueueManager( address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER) srvr = manager.get_server() addr = srvr.address # Close the connection.Listener socket which gets opened as a part # of manager.get_server(). It's not needed for the test. srvr.listener.close() manager.start() p = self.Process(target=self._putter, args=(manager.address, authkey)) p.daemon = True p.start() queue = manager.get_queue() self.assertEqual(queue.get(), 'hello world') del queue manager.shutdown() manager = QueueManager( address=addr, authkey=authkey, serializer=SERIALIZER) manager.start() manager.shutdown() # # #
def _test(cls, address): conn = cls.connection.Client(address) conn.send('hello') conn.close()
def test_listener_client(self): for family in self.connection.families: l = self.connection.Listener(family=family) p = self.Process(target=self._test, args=(l.address,)) p.daemon = True p.start() conn = l.accept() self.assertEqual(conn.recv(), 'hello') p.join() l.close()
def test_deliver_challenge_auth_failure(self): class _FakeConnection(object): def recv_bytes(self, size): return b'something bogus' def send_bytes(self, data): pass self.assertRaises(multiprocessing.AuthenticationError, multiprocessing.connection.deliver_challenge, _FakeConnection(), b'abc')
def _test_timeout(cls, child, address): time.sleep(1) child.send(123) child.close() conn = multiprocessing.connection.Client(address) conn.send(456) conn.close()
def _test_ignore_listener(cls, conn): def handler(signum, frame): pass signal.signal(signal.SIGUSR1, handler) l = multiprocessing.connection.Listener() conn.send(l.address) a = l.accept() a.send('welcome')
def test_rapid_restart(self): authkey = os.urandom(32) manager = QueueManager( address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) srvr = manager.get_server() addr = srvr.address # Close the connection.Listener socket which gets opened as a part # of manager.get_server(). It's not needed for the test. srvr.listener.close() manager.start() p = self.Process(target=self._putter, args=(manager.address, authkey)) p.daemon = True p.start() queue = manager.get_queue() self.assertEqual(queue.get(), 'hello world') del queue manager.shutdown() manager = QueueManager( address=addr, authkey=authkey, serializer=SERIALIZER) try: manager.start() except OSError as e: if e.errno != errno.EADDRINUSE: raise # Retry after some time, in case the old socket was lingering # (sporadic failure on buildbots) time.sleep(1.0) manager = QueueManager( address=addr, authkey=authkey, serializer=SERIALIZER) manager.shutdown() # # #
def test_multiple_bind(self): for family in self.connection.families: l = self.connection.Listener(family=family) self.addCleanup(l.close) self.assertRaises(OSError, self.connection.Listener, l.address, family)
def test_context(self): with self.connection.Listener() as l: with self.connection.Client(l.address) as c: with l.accept() as d: c.send(1729) self.assertEqual(d.recv(), 1729) if self.TYPE == 'processes': self.assertRaises(OSError, l.accept)
def test_issue14725(self): l = self.connection.Listener() p = self.Process(target=self._test, args=(l.address,)) p.daemon = True p.start() time.sleep(1) # On Windows the client process should by now have connected, # written data and closed the pipe handle by now. This causes # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue # 14725. conn = l.accept() self.assertEqual(conn.recv(), 'hello') conn.close() p.join() l.close()
def test_issue16955(self): for fam in self.connection.families: l = self.connection.Listener(family=fam) c = self.connection.Client(l.address) a = l.accept() a.send_bytes(b"hello") self.assertTrue(c.poll(1)) a.close() c.close() l.close()
def _remote(cls, conn): for (address, msg) in iter(conn.recv, None): client = cls.connection.Client(address) client.send(msg.upper()) client.close() address, msg = conn.recv() client = socket.socket() client.connect(address) client.sendall(msg.upper()) client.close() conn.close()
def test_invalid_handles(self): conn = multiprocessing.connection.Connection(44977608) # check that poll() doesn't crash try: conn.poll() except (ValueError, OSError): pass finally: # Hack private attribute _handle to avoid printing an error # in conn.__del__ conn._handle = None self.assertRaises((ValueError, OSError), multiprocessing.connection.Connection, -1)
def test_wait(self, slow=False): from multiprocessing.connection import wait readers = [] procs = [] messages = [] for i in range(4): r, w = multiprocessing.Pipe(duplex=False) p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) p.daemon = True p.start() w.close() readers.append(r) procs.append(p) self.addCleanup(p.join) while readers: for r in wait(readers): try: msg = r.recv() except EOFError: readers.remove(r) r.close() else: messages.append(msg) messages.sort() expected = sorted((i, p.pid) for i in range(10) for p in procs) self.assertEqual(messages, expected)
def test_wait_socket(self, slow=False): from multiprocessing.connection import wait l = socket.socket() l.bind((test.support.HOST, 0)) l.listen(4) addr = l.getsockname() readers = [] procs = [] dic = {} for i in range(4): p = multiprocessing.Process(target=self._child_test_wait_socket, args=(addr, slow)) p.daemon = True p.start() procs.append(p) self.addCleanup(p.join) for i in range(4): r, _ = l.accept() readers.append(r) dic[r] = [] l.close() while readers: for r in wait(readers): msg = r.recv(32) if not msg: readers.remove(r) r.close() else: dic[r].append(msg) expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') for v in dic.values(): self.assertEqual(b''.join(v), expected)
def test_neg_timeout(self): from multiprocessing.connection import wait a, b = multiprocessing.Pipe() t = time.time() res = wait([a], timeout=-1) t = time.time() - t self.assertEqual(res, []) self.assertLess(t, 1) a.close() b.close() # # Issue 14151: Test invalid family on invalid environment #
def test_invalid_family(self): with self.assertRaises(ValueError): multiprocessing.connection.Listener(r'\\.\test')
def test_invalid_family_win32(self): with self.assertRaises(ValueError): multiprocessing.connection.Listener('/var/test.pipe') # # Issue 12098: check sys.flags of child matches that for parent #
def _test_ignore_listener(cls, conn): def handler(signum, frame): pass signal.signal(signal.SIGUSR1, handler) with multiprocessing.connection.Listener() as l: conn.send(l.address) a = l.accept() a.send('welcome')
def server_establish_socket(self) -> None: """Establish IPC server.""" listener = multiprocessing.connection.Listener(('localhost', RXM_LISTEN_SOCKET)) self.interface = listener.accept()
def client_establish_socket(self) -> None: """Establish IPC client.""" try: phase("Waiting for connection to NH", offset=11) while True: try: socket_number = TXM_DD_LISTEN_SOCKET if self.settings.data_diode_sockets else NH_LISTEN_SOCKET self.interface = multiprocessing.connection.Client(('localhost', socket_number)) phase("Established", done=True) break except socket.error: time.sleep(0.1) except KeyboardInterrupt: graceful_exit()
def test_wait_integer(self): from multiprocessing.connection import wait expected = 3 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) sem = multiprocessing.Semaphore(0) a, b = multiprocessing.Pipe() p = multiprocessing.Process(target=self.signal_and_sleep, args=(sem, expected)) p.start() self.assertIsInstance(p.sentinel, int) self.assertTrue(sem.acquire(timeout=20)) start = time.time() res = wait([a, p.sentinel, b], expected + 20) delta = time.time() - start self.assertEqual(res, [p.sentinel]) self.assertLess(delta, expected + 2) self.assertGreater(delta, expected - 2) a.send(None) start = time.time() res = wait([a, p.sentinel, b], 20) delta = time.time() - start self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) self.assertLess(delta, 0.4) b.send(None) start = time.time() res = wait([a, p.sentinel, b], 20) delta = time.time() - start self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) self.assertLess(delta, 0.4) p.terminate() p.join()