Python socket 模块,makefile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.makefile()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_dealloc_warn(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        r = repr(sock)
        with self.assertWarns(ResourceWarning) as cm:
            sock = None
            support.gc_collect()
        self.assertIn(r, str(cm.warning.args[0]))
        # An open socket file object gets dereferenced after the socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        f = sock.makefile('rb')
        r = repr(sock)
        sock = None
        support.gc_collect()
        with self.assertWarns(ResourceWarning):
            f = None
            support.gc_collect()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_dealloc_warn(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        r = repr(sock)
        with self.assertWarns(ResourceWarning) as cm:
            sock = None
            support.gc_collect()
        self.assertIn(r, str(cm.warning.args[0]))
        # An open socket file object gets dereferenced after the socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        f = sock.makefile('rb')
        r = repr(sock)
        sock = None
        support.gc_collect()
        with self.assertWarns(ResourceWarning):
            f = None
            support.gc_collect()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_dealloc_warn(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        r = repr(sock)
        with self.assertWarns(ResourceWarning) as cm:
            sock = None
            support.gc_collect()
        self.assertIn(r, str(cm.warning.args[0]))
        # An open socket file object gets dereferenced after the socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        f = sock.makefile('rb')
        r = repr(sock)
        sock = None
        support.gc_collect()
        with self.assertWarns(ResourceWarning):
            f = None
            support.gc_collect()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_dealloc_warn(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        r = repr(sock)
        with self.assertWarns(ResourceWarning) as cm:
            sock = None
            support.gc_collect()
        self.assertIn(r, str(cm.warning.args[0]))
        # An open socket file object gets dereferenced after the socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        f = sock.makefile('rb')
        r = repr(sock)
        sock = None
        support.gc_collect()
        with self.assertWarns(ResourceWarning):
            f = None
            support.gc_collect()
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def worker(sock):
    """
    Called by a worker process after the fork().
    """
    signal.signal(SIGHUP, SIG_DFL)
    signal.signal(SIGCHLD, SIG_DFL)
    signal.signal(SIGTERM, SIG_DFL)
    # restore the handler for SIGINT,
    # it's useful for debugging (show the stacktrace before exit)
    signal.signal(SIGINT, signal.default_int_handler)

    # Read the socket using fdopen instead of socket.makefile() because the latter
    # seems to be very slow; note that we need to dup() the file descriptor because
    # otherwise writes also cause a seek that makes us miss data on the read side.
    infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
    outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
    exit_code = 0
    try:
        worker_main(infile, outfile)
    except SystemExit as exc:
        exit_code = compute_real_exit_code(exc.code)
    finally:
        try:
            outfile.flush()
        except Exception:
            pass
    return exit_code
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_name_closed_socketio(self):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            fp = sock.makefile("rb")
            fp.close()
            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
            threading.Event() for i in range(4)]
        SocketConnectedTest.setUp(self)
        self.read_file = self.cli_conn.makefile(
            self.read_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def clientSetUp(self):
        SocketConnectedTest.clientSetUp(self)
        self.write_file = self.serv_conn.makefile(
            self.write_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testCloseAfterMakefile(self):
        # The file returned by makefile should keep the socket open.
        self.cli_conn.close()
        # read until EOF
        msg = self.read_file.read()
        self.assertEqual(msg, self.read_msg)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testMakefileClose(self):
        # The file returned by makefile should keep the socket open...
        self.cli_conn.close()
        msg = self.cli_conn.recv(1024)
        self.assertEqual(msg, self.read_msg)
        # ...until the file is itself closed
        self.read_file.close()
        self.assertRaises(socket.error, self.cli_conn.recv, 1024)
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:ngas    作者:ICRAR    | 项目源码 | 文件源码
def process_request(self,
                        request,
                        client_address):
        """
        Start a new thread to process the request.
        """

        if self._ngamsServer.serving_count >= self._ngamsServer.getCfg().getMaxSimReqs():
            logger.error("Maximum number of serving threads reached, rejecting request")
            wfile = request.makefile('wb')
            wfile.write(b'HTTP/1.0 503 Service Unavailable\r\n\r\n')
            return

        SocketServer.ThreadingMixIn.process_request(self, request, client_address)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_name_closed_socketio(self):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            fp = sock.makefile("rb")
            fp.close()
            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_unusable_closed_socketio(self):
        with socket.socket() as sock:
            fp = sock.makefile("rb", buffering=0)
            self.assertTrue(fp.readable())
            self.assertFalse(fp.writable())
            self.assertFalse(fp.seekable())
            fp.close()
            self.assertRaises(ValueError, fp.readable)
            self.assertRaises(ValueError, fp.writable)
            self.assertRaises(ValueError, fp.seekable)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
            threading.Event() for i in range(4)]
        SocketConnectedTest.setUp(self)
        self.read_file = self.cli_conn.makefile(
            self.read_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def clientSetUp(self):
        SocketConnectedTest.clientSetUp(self)
        self.write_file = self.serv_conn.makefile(
            self.write_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testUnbufferedReadline(self):
        # Read a line, create a new file object, read another line with it
        line = self.read_file.readline() # first line
        self.assertEqual(line, b"A. " + self.write_msg) # first line
        self.read_file = self.cli_conn.makefile('rb', 0)
        line = self.read_file.readline() # second line
        self.assertEqual(line, b"B. " + self.write_msg) # second line
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testMakefileClose(self):
        # The file returned by makefile should keep the socket open...
        self.cli_conn.close()
        msg = self.cli_conn.recv(1024)
        self.assertEqual(msg, self.read_msg)
        # ...until the file is itself closed
        self.read_file.close()
        self.assertRaises(socket.error, self.cli_conn.recv, 1024)
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:deb-python-pymysql    作者:openstack    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:ServerlessCrawler-VancouverRealState    作者:MarcelloLins    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def worker(sock):
    """
    Called by a worker process after the fork().
    """
    signal.signal(SIGHUP, SIG_DFL)
    signal.signal(SIGCHLD, SIG_DFL)
    signal.signal(SIGTERM, SIG_DFL)
    # restore the handler for SIGINT,
    # it's useful for debugging (show the stacktrace before exit)
    signal.signal(SIGINT, signal.default_int_handler)

    # Read the socket using fdopen instead of socket.makefile() because the latter
    # seems to be very slow; note that we need to dup() the file descriptor because
    # otherwise writes also cause a seek that makes us miss data on the read side.
    infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
    outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
    exit_code = 0
    try:
        worker_main(infile, outfile)
    except SystemExit as exc:
        exit_code = compute_real_exit_code(exc.code)
    finally:
        try:
            outfile.flush()
        except Exception:
            pass
    return exit_code
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_name_closed_socketio(self):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            fp = sock.makefile("rb")
            fp.close()
            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_unusable_closed_socketio(self):
        with socket.socket() as sock:
            fp = sock.makefile("rb", buffering=0)
            self.assertTrue(fp.readable())
            self.assertFalse(fp.writable())
            self.assertFalse(fp.seekable())
            fp.close()
            self.assertRaises(ValueError, fp.readable)
            self.assertRaises(ValueError, fp.writable)
            self.assertRaises(ValueError, fp.seekable)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
            threading.Event() for i in range(4)]
        SocketConnectedTest.setUp(self)
        self.read_file = self.cli_conn.makefile(
            self.read_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def clientSetUp(self):
        SocketConnectedTest.clientSetUp(self)
        self.write_file = self.serv_conn.makefile(
            self.write_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testUnbufferedReadline(self):
        # Read a line, create a new file object, read another line with it
        line = self.read_file.readline() # first line
        self.assertEqual(line, b"A. " + self.write_msg) # first line
        self.read_file = self.cli_conn.makefile('rb', 0)
        line = self.read_file.readline() # second line
        self.assertEqual(line, b"B. " + self.write_msg) # second line
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testMakefileClose(self):
        # The file returned by makefile should keep the socket open...
        self.cli_conn.close()
        msg = self.cli_conn.recv(1024)
        self.assertEqual(msg, self.read_msg)
        # ...until the file is itself closed
        self.read_file.close()
        self.assertRaises(OSError, self.cli_conn.recv, 1024)
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:teleport    作者:eomsoft    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_name_closed_socketio(self):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            fp = sock.makefile("rb")
            fp.close()
            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_unusable_closed_socketio(self):
        with socket.socket() as sock:
            fp = sock.makefile("rb", buffering=0)
            self.assertTrue(fp.readable())
            self.assertFalse(fp.writable())
            self.assertFalse(fp.seekable())
            fp.close()
            self.assertRaises(ValueError, fp.readable)
            self.assertRaises(ValueError, fp.writable)
            self.assertRaises(ValueError, fp.seekable)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
            threading.Event() for i in range(4)]
        SocketConnectedTest.setUp(self)
        self.read_file = self.cli_conn.makefile(
            self.read_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def clientSetUp(self):
        SocketConnectedTest.clientSetUp(self)
        self.write_file = self.serv_conn.makefile(
            self.write_mode, self.bufsize,
            encoding = self.encoding,
            errors = self.errors,
            newline = self.newline)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testUnbufferedReadline(self):
        # Read a line, create a new file object, read another line with it
        line = self.read_file.readline() # first line
        self.assertEqual(line, b"A. " + self.write_msg) # first line
        self.read_file = self.cli_conn.makefile('rb', 0)
        line = self.read_file.readline() # second line
        self.assertEqual(line, b"B. " + self.write_msg) # second line
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testMakefileClose(self):
        # The file returned by makefile should keep the socket open...
        self.cli_conn.close()
        msg = self.cli_conn.recv(1024)
        self.assertEqual(msg, self.read_msg)
        # ...until the file is itself closed
        self.read_file.close()
        self.assertRaises(OSError, self.cli_conn.recv, 1024)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def __init__(self, conn):
        self.conn = conn
        self.conn._really_makefile = self.conn.makefile
        self.conn.makefile = self
        self.armed = False
        self.file_reg = []
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def unwrap(self):
        self.conn.makefile = self.conn._really_makefile
        del self.conn._really_makefile
项目:prophessor    作者:paulhauner    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:power-pi    作者:Knapsacks    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)
项目:power-pi    作者:Knapsacks    | 项目源码 | 文件源码
def _makefile(sock, mode):
        return sock.makefile(mode)