我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用httplib.responses()。
def echo_json_response(handler,code,status=None,results=None): """Takes a JSON package and returns it to the user w/ full HTTP headers""" if handler is None or code is None: return False if status is None: status = httplib.responses[code] if results is None: results = {} json_res = {'code': code, 'status': status, 'results' : results} json_response = json.dumps(json_res) if isinstance(handler, BaseHTTPRequestHandler): handler.send_response(code) handler.send_header('Content-Type', 'application/json') handler.end_headers() handler.wfile.write(json_response) return True elif isinstance(handler, tornado.web.RequestHandler): handler.set_status(code) handler.set_header('Content-Type', 'application/json') handler.write(json_response) return True else: return False
def handle_event(self, event): if isinstance(event, h2.events.ResponseReceived): headers = self.build_http_headers(event.headers) status_code = int(headers.pop(':status')) start_line = ResponseStartLine( 'HTTP/2.0', status_code, httplib.responses[status_code] ) self.headers_received(start_line, headers) elif isinstance(event, h2.events.DataReceived): self.data_received(event.data) elif isinstance(event, h2.events.StreamEnded): self._stream_ended = True self.context.remove_stream_delegate(self.stream_id) if len(self._pushed_responses) == len(self._pushed_streams): self.finish() elif isinstance(event, h2.events.PushedStreamReceived): stream = self.from_push_stream(event) self._pushed_streams[event.pushed_stream_id] = stream else: logger.warning('ignored event: %r, %r', event, event.__dict__)
def handle_event(self, event): if isinstance(event, h2.events.ResponseReceived): headers = self.build_http_headers(event.headers) status_code = int(headers.pop(':status')) start_line = httputil.ResponseStartLine( 'HTTP/2.0', status_code, httplib.responses[status_code] ) self.headers_received(start_line, headers) elif isinstance(event, h2.events.DataReceived): self.data_received(event.data) elif isinstance(event, h2.events.StreamEnded): self._stream_ended = True self.context.remove_stream_delegate(self.stream_id) if len(self._pushed_responses) == len(self._pushed_streams): self.finish() elif isinstance(event, h2.events.PushedStreamReceived): stream = self.from_push_stream(event) self._pushed_streams[event.pushed_stream_id] = stream else: logger.warning('ignored event: %r, %r', event, event.__dict__)
def __call__(self, environ, start_response): handler = web.Application.__call__(self, HTTPRequest(environ)) # ????<????>????: assert handler._finished # ?? status = str(handler._status_code) + " " + \ httplib.responses[handler._status_code] headers = handler._headers.items() for cookie_dict in getattr(handler, "_new_cookies", []): for cookie in cookie_dict.values(): headers.append(("Set-Cookie", cookie.OutputString(None))) start_response(status, headers) return handler._write_buffer
def _test_connectivity(self, param, LADS): """ Called when the user depresses the test connectivity button on the Phantom UI. This query returns a list of configured applications https://api.a10networks.com/api/v2/applications """ self.debug_print("%s _test_connectivity %s" % (A10_LADS_Connector.BANNER, param)) msg = "test connectivity to %s status_code: " % (LADS.dashboard) if LADS.genericGET(uri="/api/v2/applications"): # True is success return self.set_status_save_progress(phantom.APP_SUCCESS, msg + "%s %s apps: %s" % (LADS.status_code, httplib.responses[LADS.status_code], LADS.get_names(LADS.response))) else: # None or False, is a failure based on incorrect IP address, username, passords return self.set_status_save_progress(phantom.APP_ERROR, msg + "%s %s" % (LADS.status_code, LADS.response))
def _test_connectivity(self, param): """ Called when the user depresses the test connectivity button on the Phantom UI. Use a basic query to determine if the IP address, username and password is correct, curl -k -u admin:redacted -X GET https://192.0.2.1/mgmt/tm/ltm/ """ self.debug_print("%s TEST_CONNECTIVITY %s" % (F5_Connector.BANNER, param)) config = self.get_config() host = config.get("device") F5 = iControl.BIG_IP(host=host, username=config.get("username"), password=config.get("password"), uri="/mgmt/tm/sys/software/image", method="GET") msg = "test connectivity to %s status_code: " % host if F5.genericGET(): # True is success return self.set_status_save_progress(phantom.APP_SUCCESS, msg + "%s %s" % (F5.status_code, httplib.responses[F5.status_code])) else: # None or False, is a failure based on incorrect IP address, username, passords return self.set_status_save_progress(phantom.APP_ERROR, msg + "%s %s" % (F5.status_code, F5.response))
def __write_error(self, status_code, error_message=None): """Return the HTTP status line and body for a given error code and message. Args: status_code: HTTP status code to be returned. error_message: Error message to be returned. Returns: Tuple (http_status, body): http_status: HTTP status line, e.g. 200 OK. body: Body of the HTTP request. """ if error_message is None: error_message = httplib.responses[status_code] status = '%d %s' % (status_code, httplib.responses[status_code]) message = EndpointsErrorMessage( state=EndpointsErrorMessage.State.APPLICATION_ERROR, error_message=error_message) return status, self.__PROTOJSON.encode_message(message)
def __init__(self, *args, **kwargs): self.pushed_responses = kwargs.pop('pushed_responses', []) self.new_request = kwargs.pop('new_request', None) super(HTTP2Response, self).__init__(*args, **kwargs) reason = kwargs.pop('reason', None) self.reason = reason or httputil.responses.get(self.code, "Unknown")
def write_error(self, status_code, **kwargs): self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def write_error(self, status_code, **kwargs): try: req_resp = stats.request(str(get_ip(self.request))) except: error("Errored while handling request IP -- still served...") self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def build_response(self, result): data = { 'result': { 'status': self.status, }, } if self.status: data['result']['value'] = result body = json.dumps(data) headers = Headers(SUCCESSFUL_HEADERS) response = MockResponse(b'HTTP/1.1', self.response_code, httplib.responses[self.response_code], headers, body) return response
def test_responses(self): self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def http_open(self, req): import mimetools, copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 name = httplib.responses[self.code] msg = mimetools.Message(StringIO(self.headers)) return self.parent.error( "http", req, MockFile(), self.code, name, msg) else: self.req = req msg = mimetools.Message(StringIO("\r\n\r\n")) return MockResponse(200, "OK", msg, "", req.get_full_url())
def wasLastResponseDelayed(): """ Returns True if the last web request resulted in a time-delay """ # 99.9999999997440% of all non time-based SQL injection affected # response times should be inside +-7*stdev([normal response times]) # Math reference: http://www.answers.com/topic/standard-deviation deviation = stdev(kb.responseTimes.get(kb.responseTimeMode, [])) threadData = getCurrentThreadData() if deviation and not conf.direct and not conf.disableStats: if len(kb.responseTimes[kb.responseTimeMode]) < MIN_TIME_RESPONSES: warnMsg = "time-based standard deviation method used on a model " warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES logger.warn(warnMsg) lowerStdLimit = average(kb.responseTimes[kb.responseTimeMode]) + TIME_STDEV_COEFF * deviation retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit)) if not kb.testMode and retVal: if kb.adjustTimeDelay is None: msg = "do you want sqlmap to try to optimize value(s) " msg += "for DBMS delay responses (option '--time-sec')? [Y/n] " kb.adjustTimeDelay = ADJUST_TIME_DELAY.DISABLE if not readInput(msg, default='Y', boolean=True) else ADJUST_TIME_DELAY.YES if kb.adjustTimeDelay is ADJUST_TIME_DELAY.YES: adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit) return retVal else: delta = threadData.lastQueryDuration - conf.timeSec if Backend.getIdentifiedDbms() in (DBMS.MYSQL,): # MySQL's SLEEP(X) lasts 0.05 seconds shorter on average delta += 0.05 return delta >= 0
def showHttpErrorCodes(): """ Shows all HTTP error codes raised till now """ if kb.httpErrorCodes: warnMsg = "HTTP error codes detected during run:\n" warnMsg += ", ".join("%d (%s) - %d times" % (code, httplib.responses[code] \ if code in httplib.responses else '?', count) \ for code, count in kb.httpErrorCodes.items()) logger.warn(warnMsg) if any((str(_).startswith('4') or str(_).startswith('5')) and _ != httplib.INTERNAL_SERVER_ERROR and _ != kb.originalCode for _ in kb.httpErrorCodes.keys()): msg = "too many 4xx and/or 5xx HTTP error codes " msg += "could mean that some kind of protection is involved (e.g. WAF)" logger.debug(msg)
def __init__(self, code, content=""): Exception.__init__(self, "Bad server response: %s %s" % (code, responses[int(code)])) self.code = code self.content = content
def get_error_html(self, status_code, **kwargs): if status_code in [404, 500, 503, 403]: filename = os.path.join(os.path.join(getsettings('BASE_DIR'),'templates'), '%d.html' % status_code) if os.path.exists(filename): with io.open(filename, 'r') as f: data = f.read() return data import httplib return "<html><title>%(code)d: %(message)s</title>" \ "<body class='bodyErrorPage'>%(code)d: %(message)s</body></html>" % { "code": status_code, "message": httplib.responses[status_code], }
def __call__(self, environ, start_response): handler = web.Application.__call__(self, HTTPRequest(environ)) assert handler._finished status = str(handler._status_code) + " " + \ httplib.responses[handler._status_code] headers = handler._headers.items() for cookie_dict in getattr(handler, "_new_cookies", []): for cookie in cookie_dict.values(): headers.append(("Set-Cookie", cookie.OutputString(None))) start_response(status, [(native_str(k), native_str(v)) for (k,v) in headers]) return handler._write_buffer
def set_status(self, status_code): """Sets the status code for our response.""" assert status_code in httplib.responses self._status_code = status_code
def _generate_headers(self): lines = [utf8(self.request.version + " " + str(self._status_code) + " " + httplib.responses[self._status_code])] lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in itertools.chain(self._headers.iteritems(), self._list_headers)]) for cookie_dict in getattr(self, "_new_cookies", []): for cookie in cookie_dict.values(): lines.append(utf8("Set-Cookie: " + cookie.OutputString(None))) return b("\r\n").join(lines) + b("\r\n\r\n")
def _handle_request_exception(self, e): if isinstance(e, HTTPError): if e.log_message: format = "%d %s: " + e.log_message args = [e.status_code, self._request_summary()] + list(e.args) logging.warning(format, *args) if e.status_code not in httplib.responses: logging.error("Bad HTTP status code: %d", e.status_code) self.send_error(500, exc_info=sys.exc_info()) else: self.send_error(e.status_code, exc_info=sys.exc_info()) else: logging.error("Uncaught exception %s\n%r", self._request_summary(), self.request, exc_info=True) self.send_error(500, exc_info=sys.exc_info())
def assert_meta(self, meta): """ Verify that the response metadata is correct. :param meta: the response meta object """ self.assert_equal(meta.user_xid, self.test_user['xid']) self.assert_equal(meta.message, httplib.responses[httplib.OK]) self.assert_equal(meta.code, httplib.OK) self.assert_datetime_within(meta.time, window=11)
def __init__(self): self.method = None self.uri_template = None self.summary = '' self.description = '' self.parameters = [] self.responses = {} self.default_response_schema = None self.response_headers = None
def add_response_codes(self, status_dict): for code, info in status_dict.items(): swagger_rsp = self.responses.setdefault(code, {}) if not info['reason']: try: code = int(code) info['reason'] = http_client.responses[code] except (KeyError, TypeError, ValueError): info['reason'] = 'Unknown' tokens = info['description'].split(maxsplit=2) if tokens: tokens[0] = tokens[0].title() swagger_rsp['description'] = '{}\n\n{}'.format( info['reason'], ' '.join(tokens)).strip()
def generate_swagger(self): swagger = {'summary': self.summary, 'description': self.description} if self.parameters: swagger['parameters'] = self.parameters if self.responses: swagger['responses'] = self.responses else: # swagger requires at least one response swagger['responses'] = {'default': {'description': ''}} # Figure out where to put the response schema and response # header details. This is probably going to change in the # future since it is `hinky' at best. default_code = 'default' status_codes = sorted(int(code) for code in swagger['responses'] if code.isdigit()) for code in status_codes: if 200 <= code < 400: default_code = str(code) break if default_code in swagger['responses']: if self.default_response_schema: swagger['responses'][default_code]['schema'] = \ self.default_response_schema if self.response_headers: swagger['responses'][default_code]['headers'] = \ self.response_headers return swagger
def set_status(self, status_code): """Sets the status code for our response.""" assert status_code in httplib.responses # ?? assert ?? ?????, ???,???? self._status_code = status_code # ?? HTTP??? # ?? value ??, ? ??????
def get_error_html(self, status_code, **kwargs): """Override to implement custom error pages. If this error was caused by an uncaught exception, the exception object can be found in kwargs e.g. kwargs['exception'] """ return "<html><title>%(code)d: %(message)s</title>" \ "<body>%(code)d: %(message)s</body></html>" % { "code": status_code, "message": httplib.responses[status_code], } # ????: ?????? ???-?? (?????) #
def _generate_headers(self): lines = [self.request.version + " " + str(self._status_code) + " " + httplib.responses[self._status_code]] lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()]) for cookie_dict in getattr(self, "_new_cookies", []): for cookie in cookie_dict.values(): lines.append("Set-Cookie: " + cookie.OutputString(None)) return "\r\n".join(lines) + "\r\n\r\n" # ??????
def _handle_request_exception(self, e): if isinstance(e, HTTPError): if e.log_message: format = "%d %s: " + e.log_message args = [e.status_code, self._request_summary()] + list(e.args) logging.warning(format, *args) if e.status_code not in httplib.responses: logging.error("Bad HTTP status code: %d", e.status_code) self.send_error(500, exception=e) else: self.send_error(e.status_code, exception=e) else: logging.error("Uncaught exception %s\n%r", self._request_summary(), self.request, exc_info=e) self.send_error(500, exception=e)
def _test_connectivity(self, param): """ Called when the user depresses the test connectivity button on the Phantom UI. """ self.debug_print("%s TEST_CONNECTIVITY %s" % (Tower_Connector.BANNER, param)) status_code = self.aaaLogin() msg = "Test connectivity to %s, status_code: %s %s" % (self.tower_instance, status_code, httplib.responses[status_code]) if status_code == requests.codes.ok: # evaluates True for good status (e.g. 200) return self.set_status_save_progress(phantom.APP_SUCCESS, msg) else: return self.set_status_save_progress(phantom.APP_ERROR, msg)
def launch_job(self, param, job_template_id): """launch the job specified by its job_template_id """ self.debug_print("%s LAUNCH_JOB parameters:\n%s\n%s" % (Tower_Connector.BANNER, param, job_template_id)) action_result = ActionResult(dict(param)) # Add an action result to the App Run self.add_action_result(action_result) URI = "https://%s/api/v1/job_templates/%s/launch/" % (self.tower_instance, job_template_id) header = self.HEADER header["Authorization"] = "Token %s" % self.token # Ansible 3.0, Prompt on launch must be specified in the job template on the Tower GUI # On the Phantom GUI specify as follows: "malicious_ip=203.0.113.10,foo=bar" with no leading spaces try: data = dict(extra_vars='%s' % dict((e.split('=') for e in param["extra vars"].split(',')))) except KeyError: data = {} # extra vars is optional try: r = requests.post(URI, headers=header, data=json.dumps(data), verify=False) except requests.ConnectionError as e: self.set_status_save_progress(phantom.APP_ERROR, str(e)) return "ConnectionError" if r.status_code in Tower_Connector.GOOD_LAUNCH_STATUS: self.job_id = r.json()['job'] return httplib.responses[r.status_code]
def wasLastResponseDelayed(): """ Returns True if the last web request resulted in a time-delay """ # 99.9999999997440% of all non time-based SQL injection affected # response times should be inside +-7*stdev([normal response times]) # Math reference: http://www.answers.com/topic/standard-deviation deviation = stdev(kb.responseTimes) threadData = getCurrentThreadData() if deviation and not conf.direct: if len(kb.responseTimes) < MIN_TIME_RESPONSES: warnMsg = "time-based standard deviation method used on a model " warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES logger.warn(warnMsg) lowerStdLimit = average(kb.responseTimes) + TIME_STDEV_COEFF * deviation retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit)) if not kb.testMode and retVal: if kb.adjustTimeDelay is None: msg = "do you want sqlmap to try to optimize value(s) " msg += "for DBMS delay responses (option '--time-sec')? [Y/n] " choice = readInput(msg, default='Y') kb.adjustTimeDelay = ADJUST_TIME_DELAY.DISABLE if choice.upper() == 'N' else ADJUST_TIME_DELAY.YES if kb.adjustTimeDelay is ADJUST_TIME_DELAY.YES: adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit) return retVal else: return (threadData.lastQueryDuration - conf.timeSec) >= 0
def http_open(self, req): import mimetools, httplib, copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 name = httplib.responses[self.code] msg = mimetools.Message(StringIO(self.headers)) return self.parent.error( "http", req, MockFile(), self.code, name, msg) else: self.req = req msg = mimetools.Message(StringIO("\r\n\r\n")) return MockResponse(200, "OK", msg, "", req.get_full_url())
def wasLastResponseDelayed(): """ Returns True if the last web request resulted in a time-delay """ # 99.9999999997440% of all non time-based SQL injection affected # response times should be inside +-7*stdev([normal response times]) # Math reference: http://www.answers.com/topic/standard-deviation deviation = stdev(kb.responseTimes.get(kb.responseTimeMode, [])) threadData = getCurrentThreadData() if deviation and not conf.direct: if len(kb.responseTimes[kb.responseTimeMode]) < MIN_TIME_RESPONSES: warnMsg = "time-based standard deviation method used on a model " warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES logger.warn(warnMsg) lowerStdLimit = average(kb.responseTimes[kb.responseTimeMode]) + TIME_STDEV_COEFF * deviation retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit)) if not kb.testMode and retVal: if kb.adjustTimeDelay is None: msg = "do you want sqlmap to try to optimize value(s) " msg += "for DBMS delay responses (option '--time-sec')? [Y/n] " choice = readInput(msg, default='Y') kb.adjustTimeDelay = ADJUST_TIME_DELAY.DISABLE if choice.upper() == 'N' else ADJUST_TIME_DELAY.YES if kb.adjustTimeDelay is ADJUST_TIME_DELAY.YES: adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit) return retVal else: return (threadData.lastQueryDuration - conf.timeSec) >= 0
def __call__(self, environ, start_response): with self._lock: app = self._app error = self._error if app: return app(environ, start_response) else: start_response('%d %s' % (error, httplib.responses[error]), []) return []
def _error_response(self, environ, start_response, status, body=None): if body: start_response( '%d %s' % (status, httplib.responses[status]), [('Content-Type', 'text/html'), ('Content-Length', str(len(body)))]) return body start_response('%d %s' % (status, httplib.responses[status]), []) return []
def _redirect_302_path_info(self, updated_path_info, environ, start_response): """Redirect to an updated path. Respond to the current request with a 302 Found status with an updated path but preserving the rest of the request. Notes: - WSGI does not make the fragment available so we are not able to preserve it. Luckily prod does not preserve the fragment so it works out. Args: updated_path_info: the new HTTP path to redirect to. environ: WSGI environ object. start_response: WSGI start response callable. Returns: WSGI-compatible iterable object representing the body of the response. """ correct_url = urlparse.urlunsplit( (environ['wsgi.url_scheme'], environ['HTTP_HOST'], urllib.quote(updated_path_info), self._quote_querystring(environ['QUERY_STRING']), None)) content_type = 'text/html; charset=utf-8' output = _REDIRECT_HTML % { 'content-type': content_type, 'status': httplib.FOUND, 'correct-url': correct_url } start_response('%d %s' % (httplib.FOUND, httplib.responses[httplib.FOUND]), [('Content-Type', content_type), ('Location', correct_url), ('Content-Length', str(len(output)))]) return output