我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.protocol.ServerFactory()。
def setUp(self): from twisted.internet import reactor self.serverFactory = protocol.ServerFactory() self.serverFactory.protocol = self.serverProto self.clientFactory = protocol.ClientFactory() self.clientFactory.protocol = self.clientProto self.clientFactory.onMade = defer.Deferred() self.serverFactory.onMade = defer.Deferred() self.serverPort = reactor.listenTCP(0, self.serverFactory) self.clientConn = reactor.connectTCP( '127.0.0.1', self.serverPort.getHost().port, self.clientFactory) def getProtos(rlst): self.cli = self.clientFactory.theProto self.svr = self.serverFactory.theProto dl = defer.DeferredList([self.clientFactory.onMade, self.serverFactory.onMade]) return dl.addCallback(getProtos)
def testPrivileged(self): factory = protocol.ServerFactory() factory.protocol = TestEcho TestEcho.d = defer.Deferred() t = internet.TCPServer(0, factory) t.privileged = 1 t.privilegedStartService() num = t._port.getHost().port factory = protocol.ClientFactory() factory.d = defer.Deferred() factory.protocol = Foo factory.line = None c = internet.TCPClient('127.0.0.1', num, factory) c.startService() factory.d.addCallback(self.assertEqual, 'lalala') factory.d.addCallback(lambda x : c.stopService()) factory.d.addCallback(lambda x : t.stopService()) factory.d.addCallback(lambda x : TestEcho.d) return factory.d
def testUNIX(self): # FIXME: This test is far too dense. It needs comments. # -- spiv, 2004-11-07 if not interfaces.IReactorUNIX(reactor, None): raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" s = service.MultiService() s.startService() factory = protocol.ServerFactory() factory.protocol = TestEcho TestEcho.d = defer.Deferred() t = internet.UNIXServer('echo.skt', factory) t.setServiceParent(s) factory = protocol.ClientFactory() factory.protocol = Foo factory.d = defer.Deferred() factory.line = None internet.UNIXClient('echo.skt', factory).setServiceParent(s) factory.d.addCallback(self.assertEqual, 'lalala') factory.d.addCallback(lambda x : s.stopService()) factory.d.addCallback(lambda x : TestEcho.d) factory.d.addCallback(self._cbTestUnix, factory, s) return factory.d
def testVolatile(self): if not interfaces.IReactorUNIX(reactor, None): raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" factory = protocol.ServerFactory() factory.protocol = wire.Echo t = internet.UNIXServer('echo.skt', factory) t.startService() self.failIfIdentical(t._port, None) t1 = copy.copy(t) self.assertIdentical(t1._port, None) t.stopService() self.assertIdentical(t._port, None) self.failIf(t.running) factory = protocol.ClientFactory() factory.protocol = wire.Echo t = internet.UNIXClient('echo.skt', factory) t.startService() self.failIfIdentical(t._connection, None) t1 = copy.copy(t) self.assertIdentical(t1._connection, None) t.stopService() self.assertIdentical(t._connection, None) self.failIf(t.running)
def testOpenSSLBuffering(self): serverProto = self.serverProto = SingleLineServerProtocol() clientProto = self.clientProto = RecordingClientProtocol() server = protocol.ServerFactory() client = self.client = protocol.ClientFactory() server.protocol = lambda: serverProto client.protocol = lambda: clientProto client.buffer = [] sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath) cCTX = ssl.ClientContextFactory() port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1') reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX) i = 0 while i < 5000 and not client.buffer: i += 1 reactor.iterate() self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def testImmediateDisconnect(self): org = "twisted.test.test_ssl" self.setupServerAndClient( (org, org + ", client"), {}, (org, org + ", server"), {}) # Set up a server, connect to it with a client, which should work since our verifiers # allow anything, then disconnect. serverProtocolFactory = protocol.ServerFactory() serverProtocolFactory.protocol = protocol.Protocol self.serverPort = serverPort = reactor.listenSSL(0, serverProtocolFactory, self.serverCtxFactory) clientProtocolFactory = protocol.ClientFactory() clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol clientProtocolFactory.connectionDisconnected = defer.Deferred() clientConnector = reactor.connectSSL('127.0.0.1', serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) return clientProtocolFactory.connectionDisconnected.addCallback( lambda ignoredResult: self.serverPort.stopListening())
def loopback(self, serverCertOpts, clientCertOpts, onServerLost=None, onClientLost=None, onData=None): if onServerLost is None: self.onServerLost = onServerLost = defer.Deferred() if onClientLost is None: self.onClientLost = onClientLost = defer.Deferred() if onData is None: onData = defer.Deferred() serverFactory = protocol.ServerFactory() serverFactory.protocol = DataCallbackProtocol serverFactory.onLost = onServerLost serverFactory.onData = onData clientFactory = protocol.ClientFactory() clientFactory.protocol = WritingProtocol clientFactory.onLost = onClientLost self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts) self.clientConn = reactor.connectSSL('127.0.0.1', self.serverPort.getHost().port, clientFactory, clientCertOpts)
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True, external_ip=None): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net self.addr_store = dict(addr_store) self.connect_addrs = connect_addrs self.preferred_storage = preferred_storage self.known_txs_var = known_txs_var self.mining_txs_var = mining_txs_var self.advertise_ip = advertise_ip self.external_ip = external_ip self.traffic_happened = variable.Event() self.nonce = random.randrange(2**64) self.peers = {} self.bans = {} # address -> end_time self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts) self.serverfactory = ServerFactory(self, max_incoming_conns) self.running = False
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), advertise_ip=True): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net self.addr_store = dict(addr_store) self.connect_addrs = connect_addrs self.preferred_storage = preferred_storage self.known_txs_var = known_txs_var self.mining_txs_var = mining_txs_var self.advertise_ip = advertise_ip self.traffic_happened = variable.Event() self.nonce = random.randrange(2**64) self.peers = {} self.bans = {} # address -> end_time self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts) self.serverfactory = ServerFactory(self, max_incoming_conns) self.running = False
def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.VariableDict({}), mining_txs_var=variable.VariableDict({}), advertise_ip=True, external_ip=None): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net self.addr_store = dict(addr_store) self.connect_addrs = connect_addrs self.preferred_storage = preferred_storage self.known_txs_var = known_txs_var self.mining_txs_var = mining_txs_var self.advertise_ip = advertise_ip self.external_ip = external_ip self.traffic_happened = variable.Event() self.nonce = random.randrange(2**64) self.peers = {} self.bans = {} # address -> end_time self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts) self.serverfactory = ServerFactory(self, max_incoming_conns) self.running = False
def main(): # ??????? factory = protocol.ServerFactory() # ???ServerFactory?,ServerFactory???factory factory.protocol = EchoServer # ??factory??protocol??,?EchoServer??????protocol reactor.listenTCP(8000, factory, interface="127.0.0.1") # print(type(reactor)) # ??type???reactor??? # twisted.internet.selectreactor.SelectReactor # ??????SelectReactor???twisted.internet.posixbase.PosixReactorBase???? # listenTCP??(port, factory, backlog=50, interface=''),backlog????listen???50 # listenTCP???twisted.internet.tcp.Port? # PosixReactorBase??????twisted.internet.base._SignalReactorMixin,?????????run?? reactor.run() # run?????????startRunning??,startRunning???ReactorBase??startRunning?? # run?????????mainLoop?? # mainLoop?????????SelectReactor.doIteration(t)??,???????????select.select???? # ???????,??self._doReadOrWrite??,??????????twisted.internet.tcp.Connection?doRead??,????? # ??self._dataReceived(data),??????self.protocol.dataReceived(data),??self.protocol???? # ?????protocol.ServerFactory().protocol,????dataReceived(data),????????????,?????listenTCP???factory # ??factory.protocol.dataReceived(data) ????EchoServer().dataReceived(data)??
def __init__(self, port=RESOURCE_MANAGER_PORT, parser=DEFAULT_PARSER, log_to_screen=True): """Initialize the resource manager server. Args: port (number): client listener port. parser (object): messages parser of type `AbstractParser`. log_to_screen (bool): Enable log prints to screen. """ self.logger = get_logger(log_to_screen) self._factory = ServerFactory() self._factory.protocol = Worker self._factory.logger = self.logger self._factory.protocol.parser = parser() self._port = port self._reactor = SelectReactor() self._reactor.listenTCP(port, self._factory) self._resource_manager = ManagerThread(self._reactor, self.logger) self._factory.request_queue = self._resource_manager.request_queue
def test_invalidDescriptor(self): """ An implementation of L{IReactorSocket.adoptStreamPort} raises L{socket.error} if passed an integer which is not associated with a socket. """ reactor = self.buildReactor() probe = socket.socket() fileno = probe.fileno() probe.close() exc = self.assertRaises( socket.error, reactor.adoptStreamPort, fileno, socket.AF_INET, ServerFactory()) if platform.isWindows() and _PY3: self.assertEqual(exc.args[0], errno.WSAENOTSOCK) else: self.assertEqual(exc.args[0], errno.EBADF)
def test_invalidAddressFamily(self): """ An implementation of L{IReactorSocket.adoptStreamPort} raises L{UnsupportedAddressFamily} if passed an address family it does not support. """ reactor = self.buildReactor() port = socket.socket() port.bind(("127.0.0.1", 0)) port.listen(1) self.addCleanup(port.close) arbitrary = 2 ** 16 + 7 self.assertRaises( UnsupportedAddressFamily, reactor.adoptStreamPort, port.fileno(), arbitrary, ServerFactory())
def test_invalidAddressFamily(self): """ An implementation of L{IReactorSocket.adoptStreamConnection} raises L{UnsupportedAddressFamily} if passed an address family it does not support. """ reactor = self.buildReactor() connection = socket.socket() self.addCleanup(connection.close) arbitrary = 2 ** 16 + 7 self.assertRaises( UnsupportedAddressFamily, reactor.adoptStreamConnection, connection.fileno(), arbitrary, ServerFactory())
def _makeDataConnection(self, ignored=None): # Establish an active data connection (i.e. server connecting to # client). deferred = defer.Deferred() class DataFactory(protocol.ServerFactory): protocol = _BufferingProtocol def buildProtocol(self, addr): p = protocol.ServerFactory.buildProtocol(self, addr) reactor.callLater(0, deferred.callback, p) return p dataPort = reactor.listenTCP(0, DataFactory(), interface='127.0.0.1') self.dataPorts.append(dataPort) cmd = 'PORT ' + ftp.encodeHostPort('127.0.0.1', dataPort.getHost().port) self.client.queueStringCommand(cmd) return deferred
def testTCP(self): s = service.MultiService() s.startService() factory = protocol.ServerFactory() factory.protocol = TestEcho TestEcho.d = defer.Deferred() t = internet.TCPServer(0, factory) t.setServiceParent(s) num = t._port.getHost().port factory = protocol.ClientFactory() factory.d = defer.Deferred() factory.protocol = Foo factory.line = None internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s) factory.d.addCallback(self.assertEqual, b'lalala') factory.d.addCallback(lambda x : s.stopService()) factory.d.addCallback(lambda x : TestEcho.d) return factory.d
def testPrivileged(self): factory = protocol.ServerFactory() factory.protocol = TestEcho TestEcho.d = defer.Deferred() t = internet.TCPServer(0, factory) t.privileged = 1 t.privilegedStartService() num = t._port.getHost().port factory = protocol.ClientFactory() factory.d = defer.Deferred() factory.protocol = Foo factory.line = None c = internet.TCPClient('127.0.0.1', num, factory) c.startService() factory.d.addCallback(self.assertEqual, b'lalala') factory.d.addCallback(lambda x : c.stopService()) factory.d.addCallback(lambda x : t.stopService()) factory.d.addCallback(lambda x : TestEcho.d) return factory.d
def testUNIX(self): # FIXME: This test is far too dense. It needs comments. # -- spiv, 2004-11-07 s = service.MultiService() s.startService() factory = protocol.ServerFactory() factory.protocol = TestEcho TestEcho.d = defer.Deferred() t = internet.UNIXServer('echo.skt', factory) t.setServiceParent(s) factory = protocol.ClientFactory() factory.protocol = Foo factory.d = defer.Deferred() factory.line = None internet.UNIXClient('echo.skt', factory).setServiceParent(s) factory.d.addCallback(self.assertEqual, b'lalala') factory.d.addCallback(lambda x : s.stopService()) factory.d.addCallback(lambda x : TestEcho.d) factory.d.addCallback(self._cbTestUnix, factory, s) return factory.d
def testVolatile(self): factory = protocol.ServerFactory() factory.protocol = wire.Echo t = internet.UNIXServer('echo.skt', factory) t.startService() self.failIfIdentical(t._port, None) t1 = copy.copy(t) self.assertIsNone(t1._port) t.stopService() self.assertIsNone(t._port) self.assertFalse(t.running) factory = protocol.ClientFactory() factory.protocol = wire.Echo t = internet.UNIXClient('echo.skt', factory) t.startService() self.failIfIdentical(t._connection, None) t1 = copy.copy(t) self.assertIsNone(t1._connection) t.stopService() self.assertIsNone(t._connection) self.assertFalse(t.running)
def test_openSSLBuffering(self): serverProto = self.serverProto = SingleLineServerProtocol() clientProto = self.clientProto = RecordingClientProtocol() server = protocol.ServerFactory() client = self.client = protocol.ClientFactory() server.protocol = lambda: serverProto client.protocol = lambda: clientProto sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath) cCTX = ssl.ClientContextFactory() port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1') self.addCleanup(port.stopListening) clientConnector = reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX) self.addCleanup(clientConnector.disconnect) return clientProto.deferred.addCallback( self.assertEqual, b"+OK <some crap>\r\n")
def testImmediateDisconnect(self): org = "twisted.test.test_ssl" self.setupServerAndClient( (org, org + ", client"), {}, (org, org + ", server"), {}) # Set up a server, connect to it with a client, which should work since our verifiers # allow anything, then disconnect. serverProtocolFactory = protocol.ServerFactory() serverProtocolFactory.protocol = protocol.Protocol self.serverPort = serverPort = reactor.listenSSL(0, serverProtocolFactory, self.serverCtxFactory) clientProtocolFactory = protocol.ClientFactory() clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol clientProtocolFactory.connectionDisconnected = defer.Deferred() reactor.connectSSL('127.0.0.1', serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) return clientProtocolFactory.connectionDisconnected.addCallback( lambda ignoredResult: self.serverPort.stopListening())
def main(): if len(sys.argv) < 2: print("Required: specify a port") return live = True my_port = int(sys.argv[1]) logging.basicConfig(filename="server%s.log" % my_port, level=logging.INFO) logging.info("Starting server on %s" % my_port) if len(sys.argv) > 2 and sys.argv[2] == "test": test_data() live = False Server(live=live).first_host(my_port) factory = protocol.ServerFactory() factory.protocol = Server reactor.listenTCP(my_port, factory) reactor.run()
def createCacheService(options): from rurouni.cache import MetricCache from rurouni.protocols import CacheManagementHandler MetricCache.init() state.events.metricReceived.addHandler(MetricCache.put) root_service = createBaseService(options) factory = ServerFactory() factory.protocol = CacheManagementHandler service = TCPServer(int(settings.CACHE_QUERY_PORT), factory, interface=settings.CACHE_QUERY_INTERFACE) service.setServiceParent(root_service) from rurouni.writer import WriterService service = WriterService() service.setServiceParent(root_service) return root_service
def main(): """ Loads the registered-widgets file. Opens up ServerFactory to listen for requests on the specified port. """ open(REGISTERED_FILE, 'a').close() # touch the file so that it exists with open(REGISTERED_FILE, 'r') as f: for line in f: line = line.strip() # Skip lines that start with '#' so that we can comment-out lines if line.startswith('#'): continue print "Loading line: '%s'" % line new_widget = Widget(json_str=line) if new_widget.device_id in REGISTERED_DEVICES: print "Skipping duplicate device ID %s" % repr(new_widget.device_id) else: REGISTERED_DEVICES[new_widget.device_id] = new_widget factory = protocol.ServerFactory() factory.protocol = DoorServer print "Starting DoorApp server listening on port %d" % PORT reactor.listenTCP(PORT, factory) reactor.run()
def buildProtocol(self, addr): p = protocol.ServerFactory.buildProtocol(self, addr) # timeOut needs to be on the Protocol instance cause # TimeoutMixin expects it there p.timeOut = self.timeOut return p
def __init__(self, rpcVersions, rpcConf, proto, service): internet.TCPServer.__init__(0, ServerFactory()) self.rpcConf = rpcConf self.proto = proto self.service = service
def _acceptFailureTest(self, socketErrorNumber): """ Test behavior in the face of an exception from C{accept(2)}. On any exception which indicates the platform is unable or unwilling to allocate further resources to us, the existing port should remain listening, a message should be logged, and the exception should not propagate outward from doRead. @param socketErrorNumber: The errno to simulate from accept. """ class FakeSocket(object): """ Pretend to be a socket in an overloaded system. """ def accept(self): raise socket.error( socketErrorNumber, os.strerror(socketErrorNumber)) factory = ServerFactory() port = self.port(0, factory, interface='127.0.0.1') originalSocket = port.socket try: port.socket = FakeSocket() port.doRead() expectedFormat = "Could not accept new connection (%s)" expectedErrorCode = errno.errorcode[socketErrorNumber] expectedMessage = expectedFormat % (expectedErrorCode,) for msg in self.messages: if msg.get('message') == (expectedMessage,): break else: self.fail("Log event for failed accept not found in " "%r" % (self.messages,)) finally: port.socket = originalSocket
def test_properlyCloseFiles(self): """ Test that lost connections properly have their underlying socket resources cleaned up. """ onServerConnectionLost = defer.Deferred() serverFactory = protocol.ServerFactory() serverFactory.protocol = lambda: ConnectionLostNotifyingProtocol( onServerConnectionLost) serverPort = self.createServer('127.0.0.1', 0, serverFactory) onClientConnectionLost = defer.Deferred() serverAddr = serverPort.getHost() clientCreator = protocol.ClientCreator( reactor, lambda: HandleSavingProtocol(onClientConnectionLost)) clientDeferred = self.connectClient( serverAddr.host, serverAddr.port, clientCreator) def clientConnected(client): """ Disconnect the client. Return a Deferred which fires when both the client and the server have received disconnect notification. """ client.transport.loseConnection() return defer.gatherResults([ onClientConnectionLost, onServerConnectionLost]) clientDeferred.addCallback(clientConnected) def clientDisconnected((client, server)): """ Verify that the underlying platform socket handle has been cleaned up. """ expectedErrorCode = self.getHandleErrorCode() err = self.assertRaises( self.getHandleExceptionType(), client.handle.send, 'bytes') self.assertEqual(err.args[0], expectedErrorCode)
def _uncleanSocketTest(self, callback): self.filename = self.mktemp() source = ("from twisted.internet import protocol, reactor\n" "reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,) env = {'PYTHONPATH': os.pathsep.join(sys.path)} d = utils.getProcessOutput(sys.executable, ("-u", "-c", source), env=env) d.addCallback(callback) return d
def testTCP(self): p = reactor.listenTCP(0, protocol.ServerFactory()) portNo = p.getHost().port self.assertNotEqual(str(p).find(str(portNo)), -1, "%d not found in %s" % (portNo, p)) return p.stopListening()
def testSSL(self, ssl=ssl): pem = util.sibpath(__file__, 'server.pem') p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem)) portNo = p.getHost().port self.assertNotEqual(str(p).find(str(portNo)), -1, "%d not found in %s" % (portNo, p)) return p.stopListening()
def testConnectionGettingRefused(self): factory = protocol.ServerFactory() factory.protocol = wire.Echo t = internet.TCPServer(0, factory) t.startService() num = t._port.getHost().port t.stopService() d = defer.Deferred() factory = protocol.ClientFactory() factory.clientConnectionFailed = lambda *args: d.callback(None) c = internet.TCPClient('127.0.0.1', num, factory) c.startService() return d
def testStoppingServer(self): if not interfaces.IReactorUNIX(reactor, None): raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" factory = protocol.ServerFactory() factory.protocol = wire.Echo t = internet.UNIXServer('echo.skt', factory) t.startService() t.stopService() self.failIf(t.running) factory = protocol.ClientFactory() d = defer.Deferred() factory.clientConnectionFailed = lambda *args: d.callback(None) reactor.connectUNIX('echo.skt', factory) return d
def testFailedVerify(self): org = "twisted.test.test_ssl" self.setupServerAndClient( (org, org + ", client"), {}, (org, org + ", server"), {}) def verify(*a): return False self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify) serverConnLost = defer.Deferred() serverProtocol = protocol.Protocol() serverProtocol.connectionLost = serverConnLost.callback serverProtocolFactory = protocol.ServerFactory() serverProtocolFactory.protocol = lambda: serverProtocol self.serverPort = serverPort = reactor.listenSSL(0, serverProtocolFactory, self.serverCtxFactory) clientConnLost = defer.Deferred() clientProtocol = protocol.Protocol() clientProtocol.connectionLost = clientConnLost.callback clientProtocolFactory = protocol.ClientFactory() clientProtocolFactory.protocol = lambda: clientProtocol clientConnector = reactor.connectSSL('127.0.0.1', serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True) return dl.addCallback(self._cbLostConns)
def testPortforward(self): """ Test port forwarding through Echo protocol. """ realServerFactory = protocol.ServerFactory() realServerFactory.protocol = lambda: self.serverProtocol realServerPort = reactor.listenTCP(0, realServerFactory, interface='127.0.0.1') self.openPorts.append(realServerPort) proxyServerFactory = portforward.ProxyFactory('127.0.0.1', realServerPort.getHost().port) proxyServerPort = reactor.listenTCP(0, proxyServerFactory, interface='127.0.0.1') self.openPorts.append(proxyServerPort) nBytes = 1000 received = [] d = defer.Deferred() def testDataReceived(data): received.extend(data) if len(received) >= nBytes: self.assertEquals(''.join(received), 'x' * nBytes) d.callback(None) self.clientProtocol.dataReceived = testDataReceived def testConnectionMade(): self.clientProtocol.transport.write('x' * nBytes) self.clientProtocol.connectionMade = testConnectionMade clientFactory = protocol.ClientFactory() clientFactory.protocol = lambda: self.clientProtocol reactor.connectTCP( '127.0.0.1', proxyServerPort.getHost().port, clientFactory) return d
def testFactories(self): f = self.service.getPOP3Factory() self.failUnless(isinstance(f, protocol.ServerFactory)) self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3) f = self.service.getSMTPFactory() self.failUnless(isinstance(f, protocol.ServerFactory)) self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP) f = self.service.getESMTPFactory() self.failUnless(isinstance(f, protocol.ServerFactory)) self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
def buildProtocol(self, addr): p = protocol.ServerFactory.buildProtocol(self, addr) p.service = self.service return p # # It is useful to know, perhaps, that the required file for this to work can # be created thusly: # # openssl req -x509 -newkey rsa:2048 -keyout file.key -out file.crt \ # -days 365 -nodes # # And then cat file.key and file.crt together. The number of days and bits # can be changed, of course. #