我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.wsgi.WSGIServer()。
def run(self, app): # pragma: no cover from wsgiref.simple_server import WSGIRequestHandler, WSGIServer from wsgiref.simple_server import make_server import socket class FixedHandler(WSGIRequestHandler): def address_string(self): # Prevent reverse DNS lookups please. return self.client_address[0] def log_request(*args, **kw): if not self.quiet: return WSGIRequestHandler.log_request(*args, **kw) handler_cls = self.options.get('handler_class', FixedHandler) server_cls = self.options.get('server_class', WSGIServer) if ':' in self.host: # Fix wsgiref for IPv6 addresses. if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 srv = make_server(self.host, self.port, app, server_cls, handler_cls) srv.serve_forever()
def run(self, handler): # pragma: no cover import flup.server.fcgi self.options.setdefault('bindAddress', (self.host, self.port)) flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, handler): from gevent import wsgi, pywsgi, local if not isinstance(threading.local(), local.local): msg = "Bottle requires gevent.monkey.patch_all() (before import)" raise RuntimeError(msg) if not self.options.pop('fast', None): wsgi = pywsgi self.options['log'] = None if self.quiet else 'default' address = (self.host, self.port) server = wsgi.WSGIServer(address, handler, **self.options) if 'BOTTLE_CHILD' in os.environ: import signal signal.signal(signal.SIGINT, lambda s, f: server.stop()) server.serve_forever()
def run(self, app): # pragma: no cover from wsgiref.simple_server import make_server from wsgiref.simple_server import WSGIRequestHandler, WSGIServer import socket class FixedHandler(WSGIRequestHandler): def address_string(self): # Prevent reverse DNS lookups please. return self.client_address[0] def log_request(*args, **kw): if not self.quiet: return WSGIRequestHandler.log_request(*args, **kw) handler_cls = self.options.get('handler_class', FixedHandler) server_cls = self.options.get('server_class', WSGIServer) if ':' in self.host: # Fix wsgiref for IPv6 addresses. if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 self.srv = make_server(self.host, self.port, app, server_cls, handler_cls) self.port = self.srv.server_port # update port actual port (0 means random) try: self.srv.serve_forever() except KeyboardInterrupt: self.srv.server_close() # Prevent ResourceWarning: unclosed socket raise
def flup(app, address, **options): import flup.server.fcgi flup.server.fcgi.WSGIServer(app, bindAddress=address).run()
def gevent(app, address, **options): options = options['options'] workers = options.workers from gevent import pywsgi from gevent.pool import Pool pywsgi.WSGIServer(address, app, spawn=workers and Pool( int(options.workers)) or 'default', log=None).serve_forever()
def pulsar(app, address, **options): from pulsar.apps import wsgi sys.argv = ['anyserver.py'] s = wsgi.WSGIServer(callable=app, bind="%s:%d" % address) s.start()
def run(self, handler): from gevent import wsgi, pywsgi, local if not isinstance(_lctx, local.local): msg = "Bottle requires gevent.monkey.patch_all() (before import)" raise RuntimeError(msg) if not self.options.get('fast'): wsgi = pywsgi log = None if self.quiet else 'default' wsgi.WSGIServer((self.host, self.port), handler, log=log).serve_forever()
def run(self, handler): # pragma: no cover import flup.server.fcgi flup.server.fcgi.WSGIServer(handler, bindAddress=(self.host, self.port)).run()
def run(self, handler): from gevent import wsgi #from gevent.hub import getcurrent #self.set_context_ident(getcurrent, weakref=True) # see contextlocal wsgi.WSGIServer((self.host, self.port), handler).serve_forever()
def run(self, handler): from gevent import wsgi as wsgi_fast, pywsgi, monkey, local if self.options.get('monkey', True): if not threading.local is local.local: monkey.patch_all() wsgi = wsgi_fast if self.options.get('fast') else pywsgi wsgi.WSGIServer((self.host, self.port), handler).serve_forever()