我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用bottle.response.content_type()。
def post_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') ## DELETE: api/datasource/crud/batch/:entitySetName #@delete('/api/datasource/crud/batch/<entitySetName>') #def delete_batch_entityset(entitySetName): # try: # result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService) # response.content_type = "application/json; charset=utf-8" # return json.dumps(result, cls=CustomEncoder) # except dalUtils.StatusCodeError as err: # response.status = err.value # except: # abort(400, 'Bad Request') # DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def apply(self, callback, route): def wrapper(*a, **ka): try: rv = callback(*a, **ka) except HTTPResponse as resp: rv = resp if isinstance(rv, dict): json_response = dumps(rv) response.content_type = 'application/json' return json_response elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict): rv.body = dumps(rv.body) rv.content_type = 'application/json' return rv return wrapper
def task_screenshots(task=0, screenshot=None): folder_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task), "shots") if os.path.exists(folder_path): if screenshot: screenshot_name = "{0}.jpg".format(screenshot) screenshot_path = os.path.join(folder_path, screenshot_name) if os.path.exists(screenshot_path): # TODO: Add content disposition. response.content_type = "image/jpeg" return open(screenshot_path, "rb").read() else: return HTTPError(404, screenshot_path) else: zip_data = StringIO() with ZipFile(zip_data, "w", ZIP_STORED) as zip_file: for shot_name in os.listdir(folder_path): zip_file.write(os.path.join(folder_path, shot_name), shot_name) # TODO: Add content disposition. response.content_type = "application/zip" return zip_data.getvalue() else: return HTTPError(404, folder_path)
def get_files(task_id): if not task_id.isdigit(): return HTTPError(code=404, output="The specified ID is invalid") files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files") zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip") with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive: root_len = len(os.path.abspath(files_path)) for root, dirs, files in os.walk(files_path): archive_root = os.path.abspath(root)[root_len:] for f in files: fullpath = os.path.join(root, f) archive_name = os.path.join(archive_root, f) archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED) if not os.path.exists(files_path): return HTTPError(code=404, output="Files not found") response.content_type = "application/zip" response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id)) return open(zip_file, "rb").read()
def translate(self): """ Processes a translation request. """ translation_request = request_provider(self._style, request) logging.debug("REQUEST - " + repr(translation_request)) translations = self._translator.translate( translation_request.segments, translation_request.settings ) response_data = { 'status': TranslationResponse.STATUS_OK, 'segments': [translation.target_words for translation in translations], 'word_alignments': [translation.get_alignment_json(as_string=False) for translation in translations] if translation_request.settings.get_alignment else None, 'word_probabilities': [translation.target_probs for translation in translations] if translation_request.settings.get_word_probs else None, } translation_response = response_provider(self._style, **response_data) logging.debug("RESPONSE - " + repr(translation_response)) response.content_type = translation_response.get_content_type() return repr(translation_response)
def post_index(): event_type = request.get_header('X-GitHub-Event') if not is_request_from_github(): abort(403, "Forbidden for IP %s, it's not GitHub's address" % remote_ip()) if request.content_type.split(';')[0] != 'application/json': abort(415, "Expected application/json, but got %s" % request.content_type) if event_type == 'ping': return handle_ping() elif event_type == 'push': return handle_push() else: abort(400, "Unsupported event type: %s" % event_type)
def prometheus_metrics(): if not test_restricted(request['REMOTE_ADDR']): return '' dhcpstat = {'shared-networks': []} dhcp6stat = {'shared-networks': []} try: dhcpstat = json.loads(exec_command([args.binary, '-c', args.dhcp4_config, '-l', args.dhcp4_leases, '-f', 'j'])) except: pass try: dhcp6stat = json.loads(exec_command([args.binary, '-c', args.dhcp6_config, '-l', args.dhcp6_leases, '-f', 'j'])) except: pass data = [] for shared_network in dhcpstat['shared-networks']: data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['used'])) data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['free'])) defined_leases = float(shared_network['defined']) leases_used_percentage = 0 if defined_leases > 0: leases_used_percentage = float(shared_network['used'])/defined_leases data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],leases_used_percentage)) for shared_network in dhcp6stat['shared-networks']: data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['used'])) data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['free'])) defined_leases = float(shared_network['defined']) leases_used_percentage = 0 if defined_leases > 0: leases_used_percentage = float(shared_network['used'])/defined_leases data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],leases_used_percentage)) response.content_type = 'text/plain' return '%s\n' % ('\n'.join(data))
def badge(number): response.set_header('Cache-Control', 'public, max-age={}'.format(SECONDS_TO_RESCRAPE)) response.set_header('Access-Control-Allow-Origin','*') response.content_type = "image/svg+xml;charset=utf-8" if number < 10: return template("First Timers-1-blue.svg", number=number) if number < 100: return template("First Timers-10-blue.svg", number=number) return template("First Timers-100-blue.svg", number=number)
def query_response(result, max_results=100000): response.content_type = 'application/json; charset=utf8' result_len = len(result) if result_len > max_results: abort(403, 'Too many rows (%s)' % result_len) else: return [row.to_dict() for row in result]
def get_json_groups(): """Get all driver families (JSON)""" response.content_type = 'application/json' families = collection.get_families() return json.dumps(sorted(families.keys()))
def get_json_drivers(): """Get all drivers (JSON)""" response.content_type = 'application/json' return json.dumps([ob.__dict__ for ob in collection.drivers])
def get_search(): try: q = request.query["q"] except KeyError: return [] else: results = graph.run( "MATCH (movie:Movie) " "WHERE movie.title =~ {title} " "RETURN movie", {"title": "(?i).*" + q + ".*"}) response.content_type = "application/json" return json.dumps([{"movie": dict(row["movie"])} for row in results])
def get_metadata(): try: response.content_type = "application/json; charset=utf-8" metadataClient = databaseInfo.getMetadataClient() return json.dumps(metadataClient, cls=MetadataEncoder, indent=2) except: abort(500, 'Internal server error') # GET: api/datasource/crud/:entitySetName?skip=20&top=10
def get_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
def get_single_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def get_many_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1} #@patch('/api/datasource/crud/<entitySetName>')
def post_entityset(entitySetName): # test1 = json.loads(request.body.read()) try: result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
def delete_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PUT: api/datasource/crud/batch/:entitySetName
def put_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/batch/:entitySetName #@patch('/api/datasource/crud/batch/<entitySetName>')
def patch_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/batch/:entitySetName
def setup(self, app): def default_error_handler(res): if res.content_type == "application/json": return res.body res.content_type = "application/json" return dumps({'message': str(res.exception if res.exception else res.body)}) app.default_error_handler = default_error_handler
def test_jsonapi_header(): """Make sure that the content type is set accordingly. http://jsonapi.org/format/#content-negotiation-clients """ content_type = request.content_type content_type_expected = "application/vnd.api+json" if content_type != content_type_expected and content_type.startswith(content_type_expected): abort(415, "The Content-Type header must be \"{}\", not \"{}\".".format( content_type_expected, content_type)) accepts = request.headers.get("Accept", "*/*").split(",") expected_accept = ["*/*", "application/*", "application/vnd.api+json"] if not any([accept in expected_accept for accept in accepts]): abort(406, "The Accept header must one of \"{}\", not \"{}\".".format( expected_accept, ",".join(accepts)))
def get_resource_ids(): """Return the list of current ids.""" test_jsonapi_header() resources = get_resources() response.content_type = 'application/vnd.api+json' return response_object({"data": [{"type": "id", "id": _id} for _id in resources], "links": {"self": get_location_url("ids")}})
def account_status(account_id): result = controller.db.iter_resources(account_id) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder)
def info(account_id, resource_id): request_data = request.query if resource_id.startswith('sg-') and 'parent_id' not in request_data: abort(400, "Missing required parameter parent_id") result = controller.info( account_id, resource_id, request_data.get('parent_id', resource_id)) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder) # this set to post to restrict permissions, perhaps another url space.
def delta(account_id): request_data = request.json for rp in ('region',): if not request_data or rp not in request_data: abort(400, "Missing required parameter %s" % rp) result = controller.get_account_delta( account_id, request_data['region'], api_url()) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder)
def error(e): response.content_type = "application/json" return json.dumps({ "status": e.status, "url": repr(request.url), "exception": repr(e.exception), # "traceback": e.traceback and e.traceback.split('\n') or '', "body": repr(e.body) }, indent=2)
def post_update(context, update): """ Updates a Cisco Spark room :param context: button state and configuration :type context: ``dict`` :param update: content of the update to be posted there :type update: ``str`` or ``dict`` If the update is a simple string, it is sent as such to Cisco Spark. Else if it a dictionary, then it is encoded as MIME Multipart. """ logging.info("Posting update to Cisco Spark room") url = 'https://api.ciscospark.com/v1/messages' headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']} if isinstance(update, dict): update['roomId'] = context['spark']['id'] payload = MultipartEncoder(fields=update) headers['Content-Type'] = payload.content_type else: payload = {'roomId': context['spark']['id'], 'text': update } response = requests.post(url=url, headers=headers, data=payload) if response.status_code != 200: logging.info(response.json()) raise Exception("Received error code {}".format(response.status_code)) logging.info('- done, check the room with Cisco Spark client software') # # handle Twilio API #
def header_json(): response.content_type = 'application/json'
def jsonize(data): """Converts data dict to JSON. @param data: data dict @return: JSON formatted data """ response.content_type = "application/json; charset=UTF-8" return json.dumps(data, sort_keys=False, indent=4)
def files_get(sha256): file_path = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256) if os.path.exists(file_path): response.content_type = "application/octet-stream; charset=UTF-8" return open(file_path, "rb").read() else: return HTTPError(404, "File not found")
def pcap_get(task_id): file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%d" % task_id, "dump.pcap") if os.path.exists(file_path): response.content_type = "application/octet-stream; charset=UTF-8" try: return open(file_path, "rb").read() except: return HTTPError(500, "An error occurred while reading PCAP") else: return HTTPError(404, "File not found")
def get_pcap(task_id): if not task_id.isdigit(): return HTTPError(code=404, output="The specified ID is invalid") pcap_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcap") if not os.path.exists(pcap_path): return HTTPError(code=404, output="PCAP not found") response.content_type = "application/vnd.tcpdump.pcap" response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_{0}.pcap".format(task_id)) return open(pcap_path, "rb").read()
def status(self): """ Reports on the status of this translation server. """ response_data = { 'status': self._status, 'models': self._models, 'version': pkg_resources.require("nematus")[0].version, 'service': 'nematus', } response.content_type = "application/json" return json.dumps(response_data)
def create_routes(host, port): """ Define registry routes """ @route("/runtimes/", method=['OPTIONS', 'GET']) def get_registry(): """ Get data about rill runtime """ from rill.runtime import Runtime response.set_header('Access-Control-Allow-Origin', '*') response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS') response.set_header('Allow', 'GET, OPTIONS') response.set_header( 'Access-Control-Allow-Headers', 'Content-Type, Authorization' ) if request.method == 'OPTIONS': return 'GET,OPTIONS' response.content_type = 'application/json' runtime = Runtime() runtime_meta = runtime.get_runtime_meta() runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port) runtime_meta['protocol'] = 'websocket' runtime_meta['id'] = 'rill_' + urlparse(address).netloc runtime_meta['seen'] = str(datetime.now()) return json.dumps([runtime_meta])
def json_api(func): def func_wrapper(*args, **kwargs): rs = func(*args, **kwargs) try: ds = json.dumps(rs) response.content_type = 'application/json; charset=utf-8' return ds except: return rs return func_wrapper # decorator for admin APIs
def default_error_handler(resp: BaseResponse): response.content_type = 'application/problem+json' msg = "%s, caused by: %s" % (resp.body, resp.exception) \ if getattr(resp, 'exception', None) else resp.body LOG.error(msg) if resp.status_code >= 500 and getattr(resp, 'traceback', None): LOG.debug(resp.traceback) return json.dumps({'title': resp.body, 'status': resp.status_code})
def web_inbound_call(button=None): """ Handles an inbound phone call This function is called from twilio cloud back-end """ if button is None: button = settings['server']['default'] logging.info("Receiving inbound call for button '{}'".format(button)) try: button = decode_token(settings, button, action='call') context = load_button(settings, button) update, twilio_action = get_push_details(context) response.content_type = 'text/xml' behaviour = twilio.twiml.Response() say = None if 'call' in twilio_action: for line in twilio_action['call']: if line.keys()[0] == 'say': say = line['say'] break if say is None: say = "What's up Doc?" behaviour.say(say) return str(behaviour) except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to handle inbound call for '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request' # # the collection of buttons that we manage #
def tasks_report(task_id, report_format="json"): formats = { "json": "report.json", "html": "report.html", "htmlsumary": "summary-report.html", "pdf": "report.pdf", "maec": "report.maec-4.1.xml", "metadata": "report.metadata.xml", } bz_formats = { "all": {"type": "-", "files": ["memory.dmp"]}, "dropped": {"type": "+", "files": ["files"]}, "dist" : {"type": "+", "files": ["shots", "reports"]}, "dist2": {"type": "-", "files": ["shots", "reports", "binary"]}, } tar_formats = { "bz2": "w:bz2", "gz": "w:gz", "tar": "w", } if report_format.lower() in formats: report_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%d" % task_id, "reports", formats[report_format.lower()]) elif report_format.lower() in bz_formats: bzf = bz_formats[report_format.lower()] srcdir = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%d" % task_id) s = StringIO() # By default go for bz2 encoded tar files (for legacy reasons.) tarmode = tar_formats.get(request.GET.get("tar"), "w:bz2") tar = tarfile.open(fileobj=s, mode=tarmode) for filedir in os.listdir(srcdir): if bzf["type"] == "-" and filedir not in bzf["files"]: tar.add(os.path.join(srcdir, filedir), arcname=filedir) if bzf["type"] == "+" and filedir in bzf["files"]: tar.add(os.path.join(srcdir, filedir), arcname=filedir) if report_format.lower() == "dist" and FULL_DB: buf = results_db.analysis.find_one({"info.id": task_id}) tarinfo = tarfile.TarInfo("mongo.json") buf_dumped = json_util.dumps(buf) tarinfo.size = len(buf_dumped) buf = StringIO(buf_dumped) tar.addfile(tarinfo, buf) tar.close() response.content_type = "application/x-tar; charset=UTF-8" return s.getvalue() else: return HTTPError(400, "Invalid report format") if os.path.exists(report_path): return open(report_path, "rb").read() else: return HTTPError(404, "Report not found")