我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.makefile()。
def test_dealloc_warn(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) r = repr(sock) with self.assertWarns(ResourceWarning) as cm: sock = None support.gc_collect() self.assertIn(r, str(cm.warning.args[0])) # An open socket file object gets dereferenced after the socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) f = sock.makefile('rb') r = repr(sock) sock = None support.gc_collect() with self.assertWarns(ResourceWarning): f = None support.gc_collect()
def worker(sock): """ Called by a worker process after the fork(). """ signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL) # restore the handler for SIGINT, # it's useful for debugging (show the stacktrace before exit) signal.signal(SIGINT, signal.default_int_handler) # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because # otherwise writes also cause a seek that makes us miss data on the read side. infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536) outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536) exit_code = 0 try: worker_main(infile, outfile) except SystemExit as exc: exit_code = compute_real_exit_code(exc.code) finally: try: outfile.flush() except Exception: pass return exit_code
def _makefile(sock, mode): return sock.makefile(mode)
def test_name_closed_socketio(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: fp = sock.makefile("rb") fp.close() self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
def setUp(self): self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ threading.Event() for i in range(4)] SocketConnectedTest.setUp(self) self.read_file = self.cli_conn.makefile( self.read_mode, self.bufsize, encoding = self.encoding, errors = self.errors, newline = self.newline)
def clientSetUp(self): SocketConnectedTest.clientSetUp(self) self.write_file = self.serv_conn.makefile( self.write_mode, self.bufsize, encoding = self.encoding, errors = self.errors, newline = self.newline)
def testCloseAfterMakefile(self): # The file returned by makefile should keep the socket open. self.cli_conn.close() # read until EOF msg = self.read_file.read() self.assertEqual(msg, self.read_msg)
def testMakefileClose(self): # The file returned by makefile should keep the socket open... self.cli_conn.close() msg = self.cli_conn.recv(1024) self.assertEqual(msg, self.read_msg) # ...until the file is itself closed self.read_file.close() self.assertRaises(socket.error, self.cli_conn.recv, 1024)
def process_request(self, request, client_address): """ Start a new thread to process the request. """ if self._ngamsServer.serving_count >= self._ngamsServer.getCfg().getMaxSimReqs(): logger.error("Maximum number of serving threads reached, rejecting request") wfile = request.makefile('wb') wfile.write(b'HTTP/1.0 503 Service Unavailable\r\n\r\n') return SocketServer.ThreadingMixIn.process_request(self, request, client_address)
def test_unusable_closed_socketio(self): with socket.socket() as sock: fp = sock.makefile("rb", buffering=0) self.assertTrue(fp.readable()) self.assertFalse(fp.writable()) self.assertFalse(fp.seekable()) fp.close() self.assertRaises(ValueError, fp.readable) self.assertRaises(ValueError, fp.writable) self.assertRaises(ValueError, fp.seekable)
def testUnbufferedReadline(self): # Read a line, create a new file object, read another line with it line = self.read_file.readline() # first line self.assertEqual(line, b"A. " + self.write_msg) # first line self.read_file = self.cli_conn.makefile('rb', 0) line = self.read_file.readline() # second line self.assertEqual(line, b"B. " + self.write_msg) # second line
def testMakefileClose(self): # The file returned by makefile should keep the socket open... self.cli_conn.close() msg = self.cli_conn.recv(1024) self.assertEqual(msg, self.read_msg) # ...until the file is itself closed self.read_file.close() self.assertRaises(OSError, self.cli_conn.recv, 1024)
def __init__(self, conn): self.conn = conn self.conn._really_makefile = self.conn.makefile self.conn.makefile = self self.armed = False self.file_reg = []
def unwrap(self): self.conn.makefile = self.conn._really_makefile del self.conn._really_makefile