我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用six.moves.http_client.NOT_FOUND。
def test_get_run_by_id(self): response = self.app.get('/runs/1234') self.assertEqual(http_client.NOT_FOUND, response.status_code) pb = 'tests/fixtures/playbooks/hello_world.yml' response = self.app.post( '/runs', data=json.dumps(dict(playbook_path=pb, inventory_file='localhosti,', options={'connection': 'local'})), content_type='application/json') res_dict = json.loads(response.data) run_id = res_dict['id'] self.assertEqual(http_client.OK, response.status_code) response = self.app.get('/runs/{}'.format(run_id)) self.assertEqual(http_client.OK, response.status_code) self._wait_for_run_complete(run_id)
def test_get_state(self): """Test getting an instance state.""" self.impl.get.return_value = { 'name': 'foo.bar#0000000001', 'host': 'baz1', 'state': 'running' } resp = self.client.get('/state/foo.bar#0000000001') resp_json = b''.join(resp.response) self.assertEqual(json.loads(resp_json.decode()), { 'name': 'foo.bar#0000000001', 'oom': None, 'signal': None, 'expires': None, 'when': None, 'host': 'baz1', 'state': 'running', 'exitcode': None }) self.assertEqual(resp.status_code, http_client.OK) self.impl.get.assert_called_with('foo.bar#0000000001') self.impl.get.return_value = None resp = self.client.get('/state/foo.bar#0000000002') self.assertEqual(resp.status_code, http_client.NOT_FOUND)
def _handle_error(url, response): """Handle response status codes.""" handlers = { http_client.NOT_FOUND: NotFoundError( 'Resource not found: {}'.format(url) ), http_client.FOUND: AlreadyExistsError( 'Resource already exists: {}'.format(url) ), http_client.FAILED_DEPENDENCY: ValidationError(response), http_client.UNAUTHORIZED: NotAuthorizedError(response), http_client.BAD_REQUEST: BadRequestError(response), } if response.status_code in handlers: raise handlers[response.status_code]
def init(api, _cors, impl): """Configures REST handlers for allocation resource.""" namespace = webutils.namespace( api, __name__, 'Local nodeinfo redirect API.' ) @namespace.route('/<hostname>/<path:path>') class _NodeRedirect(restplus.Resource): """Redirects to local nodeinfo endpoint.""" def get(self, hostname, path): """Returns list of local instances.""" hostport = impl.get(hostname) if not hostport: return 'Host not found.', http_client.NOT_FOUND url = utils.encode_uri_parts(path) return flask.redirect('http://%s/%s' % (hostport, url), code=http_client.FOUND)
def catalog_1(self, *tokens): """Outputs the contents of the specified catalog file, using the name in the request path, directly to the client.""" try: name = tokens[0] except IndexError: raise cherrypy.HTTPError(http_client.FORBIDDEN, _("Directory listing not allowed.")) try: fpath = self.repo.catalog_1(name, pub=self._get_req_pub()) except srepo.RepositoryError as e: # Treat any remaining repository error as a 404, but # log the error and include the real failure # information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) self.__set_response_expires("catalog", 86400, 86400) return serve_file(fpath, "text/plain; charset=utf-8")
def file_0(self, *tokens): """Outputs the contents of the file, named by the SHA-1 hash name in the request path, directly to the client.""" try: fhash = tokens[0] except IndexError: fhash = None try: fpath = self.repo.file(fhash, pub=self._get_req_pub()) except srepo.RepositoryFileNotFoundError as e: raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) except srepo.RepositoryError as e: # Treat any remaining repository error as a 404, but # log the error and include the real failure # information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) self.__set_response_expires("file", 86400*365, 86400*365) return serve_file(fpath, "application/data")
def test_face_root(self): """Verify that files outside of the package content web root cannot be accessed, and that files inside can be.""" depot_url = self.dc.get_depot_url() # Since /usr/share/lib/pkg/web/ is the content web root, # any attempts to go outside that directory should fail # with a 404 error. try: urlopen("{0}/../../../../bin/pkg".format(depot_url)) except HTTPError as e: if e.code != http_client.NOT_FOUND: raise f = urlopen("{0}/robots.txt".format(depot_url)) self.assertTrue(len(f.read())) f.close()
def _refresh(self, http_request): """Refreshes the access_token. Skip all the storage hoops and just refresh using the API. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: HttpAccessTokenRefreshError: When the refresh fails. """ query = '?scope=%s' % urllib.parse.quote(self.scope, '') uri = META.replace('{?scope}', query) response, content = http_request( uri, headers={'Metadata-Flavor': 'Google'}) content = _from_bytes(content) if response.status == http_client.OK: try: token_content = json.loads(content) except Exception as e: raise HttpAccessTokenRefreshError(str(e), status=response.status) self.access_token = token_content['access_token'] else: if response.status == http_client.NOT_FOUND: content += (' This can occur if a VM was created' ' with no service account or scopes.') raise HttpAccessTokenRefreshError(content, status=response.status)
def test_get_member_for_invalid_id(self): # | GIVEN | # setting a valid member identity self.test_resource_collection.members_identities = ('1',) self.conn.get.side_effect = exceptions.ResourceNotFoundError( method='GET', url='http://foo.bar:8000/redfish/v1/Fakes/2', response=mock.Mock(status_code=http_client.NOT_FOUND)) # | WHEN & THEN | self.assertRaises(exceptions.ResourceNotFoundError, self.test_resource_collection.get_member, '2') self.conn.get.assert_called_once_with(path='Fakes/2')
def test_not_found_error(self): self.request.return_value.status_code = http_client.NOT_FOUND self.request.return_value.json.side_effect = ValueError('no json') with self.assertRaisesRegex(exceptions.ResourceNotFoundError, 'Resource http://foo.bar not found') as cm: self.conn._op('GET', 'http://foo.bar') exc = cm.exception self.assertEqual(http_client.NOT_FOUND, exc.status_code)
def raise_for_response(method, url, response): """Raise a correct error class, if needed.""" if response.status_code < http_client.BAD_REQUEST: return elif response.status_code == http_client.NOT_FOUND: raise ResourceNotFoundError(method, url, response) elif response.status_code == http_client.BAD_REQUEST: raise BadRequestError(method, url, response) elif response.status_code in (http_client.UNAUTHORIZED, http_client.FORBIDDEN): raise AccessError(method, url, response) elif response.status_code >= http_client.INTERNAL_SERVER_ERROR: raise ServerSideError(method, url, response) else: raise HTTPError(method, url, response)
def test_get_failure(self): http = request_mock( http_client.NOT_FOUND, 'text/html', '<p>Error</p>') with self.assertRaises(http_client.HTTPException): _metadata.get(http, PATH) # Verify mocks. self.assertEqual(http.requests, 1) self.assertEqual(http.uri, EXPECTED_URL) self.assertEqual(http.method, 'GET') self.assertIsNone(http.body) self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
def _environment_check_gce_helper(self, status_ok=True, server_software=''): if status_ok: headers = {'status': http_client.OK} headers.update(client._GCE_HEADERS) else: headers = {'status': http_client.NOT_FOUND} http = http_mock.HttpMock(headers=headers) with mock.patch('oauth2client.client.os') as os_module: os_module.environ = {client._SERVER_SOFTWARE: server_software} with mock.patch('oauth2client.transport.get_http_object', return_value=http) as new_http: if server_software == '': self.assertFalse(client._in_gae_environment()) else: self.assertTrue(client._in_gae_environment()) if status_ok and server_software == '': self.assertTrue(client._in_gce_environment()) else: self.assertFalse(client._in_gce_environment()) # Verify mocks. if server_software == '': new_http.assert_called_once_with( timeout=client.GCE_METADATA_TIMEOUT) self.assertEqual(http.requests, 1) self.assertEqual(http.uri, client._GCE_METADATA_URI) self.assertEqual(http.method, 'GET') self.assertIsNone(http.body) self.assertEqual(http.headers, client._GCE_HEADERS) else: new_http.assert_not_called() self.assertEqual(http.requests, 0)
def check_url(url): # The url will be routed to PODMProxy resource if it didn't match # others. So filter out these incorrect url. # Only support proxy request of below podm API resource = url.split("/")[0] filterext = ["Chassis", "Services", "Managers", "Systems", "EventService", "Nodes", "EthernetSwitches"] if resource not in filterext: flask.abort(http_client.NOT_FOUND)
def _do_switch(dpid, func, ret_func): dpid = dpid_lib.str_to_dpid(dpid) try: ret = func(dpid) except KeyError: return Response(status=http_client.NOT_FOUND, body='no dpid is found %s' % dpid_lib.dpid_to_str(dpid)) return ret_func(ret)
def _do_key(dpid, key, func, ret_func): dpid = dpid_lib.str_to_dpid(dpid) try: ret = func(dpid, key) except KeyError: return Response(status=http_client.NOT_FOUND, body='no dpid/key is found %s %s' % (dpid_lib.dpid_to_str(dpid), key)) return ret_func(ret)
def test_get_404(self, resp_mock): """Test treadmill.restclient.get NOT_FOUND (404)""" resp_mock.return_value.status_code = http_client.NOT_FOUND with self.assertRaises(restclient.NotFoundError): restclient.get('http://foo.com', '/')
def test_app_log_failure(self): """Dummy tests for the case when logs cannot be found.""" self.impl.log.get.side_effect = get_log_failure resp = self.client.get( '/local-app/proid.app/uniq/service/service_name' ) self.assertEqual(resp.status_code, http_client.NOT_FOUND) resp = self.client.get('/local-app/proid.app/uniq/sys/component') self.assertEqual(resp.status_code, http_client.NOT_FOUND)
def test_arch_get(self): """Dummy tests for returning application archives.""" self.impl.archive.get.return_value = __file__ resp = self.client.get('/archive/<app>/<uniq>/app') self.assertEqual(resp.status_code, http_client.OK) self.impl.archive.get.side_effect = err resp = self.client.get('/archive/<app>/<uniq>/app') self.assertEqual(resp.status_code, http_client.NOT_FOUND) resp = self.client.get('/archive/<app>/<uniq>/sys') self.assertEqual(resp.status_code, http_client.NOT_FOUND)
def test_redir_no_path_no_lb(self, *_args): """Test redirect with no path and no LB""" resp = self.app.get('/redir/lb/type1/cell1') self.assertEqual(resp.status_code, http_client.NOT_FOUND)
def test_update_not_found(self): uuid = uuidutils.generate_uuid() response = self.patch_json('/servers/%s' % uuid, [{'path': '/metadata/a', 'value': 'b', 'op': 'add'}], headers=self.headers, expect_errors=True) self.assertEqual(http_client.NOT_FOUND, response.status_int) self.assertEqual('application/json', response.content_type) self.assertTrue(response.json['error_message'])
def get_all(self, flavor_uuid): """Retrieve a list of extra specs of the queried flavor.""" flavor = objects.Flavor.get(pecan.request.context, flavor_uuid) # public flavor to all projects if flavor.is_public: msg = _("Access list not available for public flavors.") raise wsme.exc.ClientSideError( msg, status_code=http_client.NOT_FOUND) # private flavor to listed projects only return _marshall_flavor_access(flavor)
def _refresh(self, http_request): """Refreshes the access_token. Skip all the storage hoops and just refresh using the API. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: HttpAccessTokenRefreshError: When the refresh fails. """ response, content = http_request( META, headers={'Metadata-Flavor': 'Google'}) content = _from_bytes(content) if response.status == http_client.OK: try: token_content = json.loads(content) except Exception as e: raise HttpAccessTokenRefreshError(str(e), status=response.status) self.access_token = token_content['access_token'] else: if response.status == http_client.NOT_FOUND: content += (' This can occur if a VM was created' ' with no service account or scopes.') raise HttpAccessTokenRefreshError(content, status=response.status)
def test_get_failure(): request = make_request( 'Metadata error', status=http_client.NOT_FOUND) with pytest.raises(exceptions.TransportError) as excinfo: _metadata.get(request, PATH) assert excinfo.match(r'Metadata error') request.assert_called_once_with( method='GET', url=_metadata._METADATA_ROOT + PATH, headers=_metadata._METADATA_HEADERS)
def test_default_error_code(self): class FakeMasakariException(exception.MasakariException): code = http.NOT_FOUND exc = FakeMasakariException() self.assertEqual(http.NOT_FOUND, exc.kwargs['code'])
def test_error_code_from_kwarg(self): class FakeMasakariException(exception.MasakariException): code = http.INTERNAL_SERVER_ERROR exc = FakeMasakariException(code=http.NOT_FOUND) self.assertEqual(exc.kwargs['code'], http.NOT_FOUND)
def test_extensions_expected_error(self): @extensions.expected_errors(http.NOT_FOUND) def fake_func(): raise webob.exc.HTTPNotFound() self.assertRaises(webob.exc.HTTPNotFound, fake_func)
def test_extensions_expected_error_from_list(self): @extensions.expected_errors((http.NOT_FOUND, http.FORBIDDEN)) def fake_func(): raise webob.exc.HTTPNotFound() self.assertRaises(webob.exc.HTTPNotFound, fake_func)
def test_extensions_unexpected_error(self): @extensions.expected_errors(http.NOT_FOUND) def fake_func(): raise webob.exc.HTTPConflict() self.assertRaises(webob.exc.HTTPInternalServerError, fake_func)
def test_extensions_unexpected_policy_not_authorized_error(self): @extensions.expected_errors(http.NOT_FOUND) def fake_func(): raise exception.PolicyNotAuthorized(action="foo") self.assertRaises(exception.PolicyNotAuthorized, fake_func)
def test_get_version_1_versions_invalid(self, mock_get_client): req = webob.Request.blank('/v1/versions/1234/foo') req.accept = "application/json" res = req.get_response(self.wsgi_app) self.assertEqual(http.NOT_FOUND, res.status_int)
def test_override_default_code(self): robj = wsgi.ResponseObject({}, code=http.NOT_FOUND) self.assertEqual(robj.code, http.NOT_FOUND)
def test_override_modified_code(self): robj = wsgi.ResponseObject({}, code=http.NOT_FOUND) robj._default_code = http.ACCEPTED self.assertEqual(robj.code, http.NOT_FOUND)
def test_get_failed_not_found(self, mock_novaclient): mock_novaclient.return_value.servers.get.side_effect = ( nova_exception.NotFound(http.NOT_FOUND, '404')) self.assertRaises(exception.NotFound, self.api.get_server, self.ctx, uuidsentinel.fake_server)
def file_2(self, *tokens): """Outputs the contents of the file, named by the SHA hash name in the request path, directly to the client.""" method = cherrypy.request.method if method == "HEAD": try: fhash = tokens[0] except IndexError: fhash = None try: fpath = self.repo.file(fhash, pub=self._get_req_pub()) except srepo.RepositoryFileNotFoundError as e: raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) except srepo.RepositoryError as e: # Treat any remaining repository error as a 404, # but log the error and include the real failure # information. cherrypy.log("Request failed: {0}".format( str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) csize, chashes = misc.compute_compressed_attrs(fhash, file_path=fpath) response = cherrypy.response for i, attr in enumerate(chashes): response.headers["X-Ipkg-Attr-{0}".format(i)] = \ "{0}={1}".format(attr, chashes[attr]) # set expiration of response to one day self.__set_response_expires("file", 86400, 86400) return serve_file(fpath, "application/data") return self.file_1(*tokens)
def index_0(self, *tokens): """Provides an administrative interface for search indexing. Returns no output if successful; otherwise the response body will contain the failure details. """ try: cmd = tokens[0] except IndexError: cmd = "" # These commands cause the operation requested to be queued # for later execution. This does mean that if the operation # fails, the client won't know about it, but this is necessary # since these are long running operations (are likely to exceed # connection timeout limits). try: if cmd == "refresh": # Update search indexes. self.__bgtask.put(self.repo.refresh_index, pub=self._get_req_pub()) else: err = "Unknown index subcommand: {0}".format( cmd) cherrypy.log(err) raise cherrypy.HTTPError(http_client.NOT_FOUND, err) except queue.Full: raise cherrypy.HTTPError(http_client.SERVICE_UNAVAILABLE, "Another operation is already in progress; try " "again later.")
def publisher_1(self, *tokens): """Returns a pkg(7) information datastream based on the the request's publisher or all if not specified.""" prefix = self._get_req_pub() pubs = [] if not prefix: pubs = self.repo.get_publishers() else: try: pub = self.repo.get_publisher(prefix) pubs.append(pub) except Exception as e: # If the Publisher object creation fails, return # a not found error to the client so it will # treat it as an unsupported operation. cherrypy.log("Request failed: {0}".format( str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) buf = cStringIO() try: p5i.write(buf, pubs) except Exception as e: # Treat any remaining error as a 404, but log it and # include the real failure information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) buf.seek(0) self.__set_response_expires("publisher", 86400*365, 86400*365) return buf.getvalue()
def status_0(self, *tokens): """Return a JSON formatted dictionary containing statistics information for the repository being served.""" self.__set_response_expires("versions", 5*60, 5*60) dump_struct = self.repo.get_status() try: out = json.dumps(dump_struct, ensure_ascii=False, indent=2, sort_keys=True) except Exception as e: raise cherrypy.HTTPError(http_client.NOT_FOUND, _("Unable " "to generate statistics.")) return misc.force_bytes(out + "\n")
def search_1(self, *args, **params): # Raise assorted errors; if not, call superclass search_1. if self.need_nasty(): errs = [http_client.NOT_FOUND, http_client.BAD_REQUEST, http_client.SERVICE_UNAVAILABLE] code = random.choice(errs) cherrypy.log("NASTY search_1: HTTP {0:d}".format(code)) raise cherrypy.HTTPError(code) return DepotHTTP.search_1(self, *args, **params)
def feed(depot, request, response, pub): if depot.repo.mirror: raise cherrypy.HTTPError(http_client.NOT_FOUND, "Operation not supported in current server mode.") if not depot.repo.get_catalog(pub).updates: raise cherrypy.HTTPError(http_client.SERVICE_UNAVAILABLE, "No update history; unable to generate feed.") return pkg.server.feed.handle(depot, request, response, pub)
def __handle_error(path, error): # All errors are treated as a 404 since reverse proxies such as Apache # don't handle 500 errors in a desirable way. For any error but a 404, # an error is logged. if error != http_client.NOT_FOUND: cherrypy.log("Error encountered while processing " "template: {0}\n".format(path), traceback=True) raise cherrypy.NotFound()
def default_error_page(status=http_client.NOT_FOUND, message="oops", traceback=None, version=None): """This function is registered as the default error page for CherryPy errors. This sets the response headers to be uncacheable, and then returns a HTTP response.""" response = cherrypy.response for key in ('Cache-Control', 'Pragma'): if key in response.headers: del response.headers[key] # Server errors are interesting, so let's log them. In the case # of an internal server error, we send a 404 to the client. but # log the full details in the server log. if (status == http_client.INTERNAL_SERVER_ERROR or status.startswith("500 ")): # Convert the error to a 404 to obscure implementation # from the client, but log the original error to the # server logs. error = cherrypy._cperror._HTTPErrorTemplate % \ {"status": http_client.NOT_FOUND, "message": http_client.responses[http_client.NOT_FOUND], "traceback": "", "version": cherrypy.__version__} print("Path that raised exception was {0}".format( cherrypy.request.path_info)) print(message) return error else: error = cherrypy._cperror._HTTPErrorTemplate % \ {"status": http_client.NOT_FOUND, "message": message, "traceback": "", "version": cherrypy.__version__} return error