我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用socket.MSG_OOB。
def test_handle_expt(self): # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used. class TestClient(BaseClient): def handle_expt(self): self.flag = True class TestHandler(BaseTestHandler): def __init__(self, conn): BaseTestHandler.__init__(self, conn) self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) server = TCPServer(TestHandler) client = TestClient(server.address) self.loop_waiting_for_flag(client)
def test_handle_expt(self): # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used. class TestClient(BaseClient): def handle_expt(self): self.flag = True class TestHandler(BaseTestHandler): def __init__(self, conn): BaseTestHandler.__init__(self, conn) self.socket.send(chr(244), socket.MSG_OOB) server = TCPServer(TestHandler) client = TestClient(server.address) self.loop_waiting_for_flag(client)
def test_handle_expt(self): # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used. if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: self.skipTest("Not applicable to AF_UNIX sockets.") class TestClient(BaseClient): def handle_expt(self): self.socket.recv(1024, socket.MSG_OOB) self.flag = True class TestHandler(BaseTestHandler): def __init__(self, conn): BaseTestHandler.__init__(self, conn) self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) server = BaseServer(self.family, self.addr, TestHandler) client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client)
def send_ping(raw_socket, dest_addr, pkt_id, seq_code, data_length=48): """Echo request. """ pkt_crc16 = 0 # icmp_type(1B):icmp_code(1B):crc16(2B):id(2):seq(2b) header = struct.pack('>bbHHH', 8, 0, pkt_crc16, pkt_id, seq_code) data = b'p' * data_length pkt_crc16 = calc_crc16(header + data) header = struct.pack('>bbHHH', 8, 0, pkt_crc16, pkt_id, seq_code) packet = header + data raw_socket.sendto(packet, (dest_addr, socket.MSG_OOB))
def test_oob_abor(self): # Send ABOR by following the RFC-959 directives of sending # Telnet IP/Synch sequence as OOB data. # On some systems like FreeBSD this happened to be a problem # due to a different SO_OOBINLINE behavior. # On some platforms (e.g. Python CE) the test may fail # although the MSG_OOB constant is defined. self.client.sock.sendall(b(chr(244)), socket.MSG_OOB) self.client.sock.sendall(b(chr(255)), socket.MSG_OOB) self.client.sock.sendall(b'abor\r\n') self.assertEqual(self.client.getresp()[:3], '225')