def _error(error, code): """Return an error as json""" _error = { "status": str(code), "title": errors[code], "detail": error.body } traceback.print_exception(type(error), error, error.traceback) response.headers["Content-Type"] = "application/vnd.api+json" return response_object(errors=[_error])
def get_resources(): """Return the resources of the authenticated user. If authentication failed, this aborts the execution with 401 Unauthorized. """ #pprint(dict(request.headers)) header = request.environ.get('HTTP_AUTHORIZATION','') if header: print("Authorization:", header) basic = request.auth if basic: username, password = basic if passwords.get(username) != password: abort(401, BASIC_ERROR) else: api_key = get_api_key() if api_key is not None: username = api_keys.get(api_key) if username is None: abort(401, API_KEY_ERROR) else: username = None return _resources[username]
def assertRedirect(self, target, result, query=None, status=303, **args): env = {'SERVER_PROTOCOL':'HTTP/1.1'} for key in list(args): if key.startswith('wsgi'): args[key.replace('_', '.', 1)] = args[key] del args[key] env.update(args) request.bind(env) bottle.response.bind() try: bottle.redirect(target, **(query or {})) except bottle.HTTPResponse: r = _e() self.assertEqual(status, r.status_code) self.assertTrue(r.headers) self.assertEqual(result, r.headers['Location'])
def echo(): try: body = request.body.read().decode('utf-8') except: body = None result = { 'method': request.method, 'headers': dict(request.headers), 'body': body, 'files': [ {'key': key, 'name': request.files[key].raw_filename} for key in request.files ] } return result
def __call__(self, *args, **kwargs): headers = {} if self._access_token: headers['Authorization'] = 'Token ' + self._access_token resp = requests.post( self._url, json=kwargs, headers=headers ) if resp.ok: data = resp.json() if data.get('status') == 'failed': raise Error(resp.status_code, data.get('retcode')) return data.get('data') raise Error(resp.status_code)
def apply(self, callback, route): def wrapped(*args, **kwargs): if not self.tracer or not self.tracer.enabled: return callback(*args, **kwargs) resource = "%s %s" % (request.method, request.route.rule) # Propagate headers such as x-datadog-trace-id. if self.distributed_tracing: propagator = HTTPPropagator() context = propagator.extract(request.headers) if context.trace_id: self.tracer.context_provider.activate(context) with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s: code = 0 try: return callback(*args, **kwargs) except Exception: # bottle doesn't always translate unhandled exceptions, so # we mark it here. code = 500 raise finally: s.set_tag(http.STATUS_CODE, code or response.status_code) s.set_tag(http.URL, request.path) s.set_tag(http.METHOD, request.method) return wrapped
def get_api_key(): """Return the api key or None.""" header = request.headers.get('Authorization') if not header: return try: method, data = header.split(None, 1) if method.lower() != 'api-key': return return touni(base64.b64decode(tob(data[4:]))) except (ValueError, TypeError): abort(401, HEADER_ERROR)
def get_endpoint_url(): """Return the url that this server is reachable at.""" return "http://" + request.headers["Host"] + BASE
def test_jsonapi_header(): """Make sure that the content type is set accordingly. http://jsonapi.org/format/#content-negotiation-clients """ content_type = request.content_type content_type_expected = "application/vnd.api+json" if content_type != content_type_expected and content_type.startswith(content_type_expected): abort(415, "The Content-Type header must be \"{}\", not \"{}\".".format( content_type_expected, content_type)) accepts = request.headers.get("Accept", "*/*").split(",") expected_accept = ["*/*", "application/*", "application/vnd.api+json"] if not any([accept in expected_accept for accept in accepts]): abort(406, "The Accept header must one of \"{}\", not \"{}\".".format( expected_accept, ",".join(accepts)))
def test_header_access(self): """ Environ: Request objects decode headers """ e = {} wsgiref.util.setup_testing_defaults(e) e['HTTP_SOME_HEADER'] = 'some value' request = BaseRequest(e) request['HTTP_SOME_OTHER_HEADER'] = 'some other value' self.assertTrue('Some-Header' in request.headers) self.assertTrue(request.headers['Some-Header'] == 'some value') self.assertTrue(request.headers['Some-Other-Header'] == 'some other value')
def test_header_access_special(self): e = {} wsgiref.util.setup_testing_defaults(e) request = BaseRequest(e) request['CONTENT_TYPE'] = 'test' request['CONTENT_LENGTH'] = '123' self.assertEqual(request.headers['Content-Type'], 'test') self.assertEqual(request.headers['Content-Length'], '123')
def test_content_type(self): rs = BaseResponse() rs.content_type = 'test/some' self.assertEquals('test/some', rs.headers.get('Content-Type'))
def test_set_header(self): response = BaseResponse() response['x-test'] = 'foo' headers = [value for name, value in response.headerlist if name.title() == 'X-Test'] self.assertEqual(['foo'], headers) self.assertEqual('foo', response['x-test']) response['X-Test'] = 'bar' headers = [value for name, value in response.headerlist if name.title() == 'X-Test'] self.assertEqual(['bar'], headers) self.assertEqual('bar', response['x-test'])
def test_append_header(self): response = BaseResponse() response.set_header('x-test', 'foo') headers = [value for name, value in response.headerlist if name.title() == 'X-Test'] self.assertEqual(['foo'], headers) self.assertEqual('foo', response['x-test']) response.add_header('X-Test', 'bar') headers = [value for name, value in response.headerlist if name.title() == 'X-Test'] self.assertEqual(['foo', 'bar'], headers) self.assertEqual('bar', response['x-test'])
def test_host_http_proxy(self): # Trust proxy headers over original header. self.assertRedirect('./test.html', 'http://example.com/test.html', HTTP_X_FORWARDED_HOST='example.com', HTTP_HOST='')
def setUp(self): self.env = {} self.headers = bottle.WSGIHeaderDict(self.env)
def test_native(self): self.env['HTTP_TEST_HEADER'] = 'foobar' self.assertEqual(self.headers['Test-header'], 'foobar')
def test_bytes(self): self.env['HTTP_TEST_HEADER'] = tob('foobar') self.assertEqual(self.headers['Test-Header'], 'foobar')
def test_dict(self): for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split(): self.assertTrue(key not in self.headers) self.assertEqual(self.headers.get(key), None) self.assertEqual(self.headers.get(key, 5), 5) self.assertRaises(KeyError, lambda x: self.headers[x], key) self.env['HTTP_FOO_BAR'] = 'test' for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split(): self.assertTrue(key in self.headers) self.assertEqual(self.headers.get(key), 'test') self.assertEqual(self.headers.get(key, 5), 'test')
def index(): auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name) endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string proxy_req_header = {} for header in request.headers: if header.lower() in PROXY_REQ_HEADERS_WHITELIST: proxy_req_header[header] = request.headers[header] if request.method == "HEAD": requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header) elif request.method == "GET": requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header) elif request.method == "POST": proxy_req_body = request.body.getvalue() requests_response = requests.post( endpoint, auth=auth, data=proxy_req_body, headers=proxy_req_header) elif request.method == "PUT": proxy_req_body = request.body.getvalue() requests_response = requests.put( endpoint, auth=auth, data=proxy_req_body, headers=proxy_req_header) else: logging.error("Method not allowed") response.body = requests_response.content for header in requests_response.headers: if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST: response.add_header(header, requests_response.headers[header]) return response
def _handle(self): if self._secret: # check signature if 'X-Signature' not in request.headers: abort(401) sec = self._secret if isinstance(sec, str): sec = sec.encode('utf-8') sig = hmac.new(sec, request.body.read(), 'sha1').hexdigest() if request.headers['X-Signature'] != 'sha1=' + sig: abort(403) post_type = request.json.get('post_type') if post_type not in ('message', 'event', 'request'): abort(400) handler_key = None for pk_pair in (('message', 'message_type'), ('event', 'event'), ('request', 'request_type')): if post_type == pk_pair[0]: handler_key = request.json.get(pk_pair[1]) if not handler_key: abort(400) else: break if not handler_key: abort(400) for group in self._groups: handler = self._handlers[group][post_type].get(handler_key) if not handler: handler = self._handlers[group][post_type].get('*') # try wildcard if handler: assert callable(handler) result = handler(request.json) if 'pass' in result: continue else: return result return ''