我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用bottle.response.set_header()。
def download(): trackrels = request.query.tracks.split('|') # write the archive into a temporary in-memory file-like object temp = tempfile.SpooledTemporaryFile() with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive: for trackrel in trackrels: base_wildcard = trackrel.replace("-track.csv", "*") paths = config.TRACKDIR.glob(base_wildcard) for path in paths: archive.write(str(path), str(path.relative_to(config.TRACKDIR)) ) temp.seek(0) # force a download; give it a filename and mime type response.set_header('Content-Disposition', 'attachment; filename="data.zip"') response.set_header('Content-Type', 'application/zip') # relying on garbage collector to delete tempfile object # (and hence the file itself) when done sending return temp
def get_files(task_id): if not task_id.isdigit(): return HTTPError(code=404, output="The specified ID is invalid") files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files") zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip") with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive: root_len = len(os.path.abspath(files_path)) for root, dirs, files in os.walk(files_path): archive_root = os.path.abspath(root)[root_len:] for f in files: fullpath = os.path.join(root, f) archive_name = os.path.join(archive_root, f) archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED) if not os.path.exists(files_path): return HTTPError(code=404, output="Files not found") response.content_type = "application/zip" response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id)) return open(zip_file, "rb").read()
def page(data=None): IV = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8)) json_data = json.dumps({'username': 'Guest', 'flag': FLAG}) data = cipher(json_data, KEY, IV) if request.get_cookie("secret_data"): secret_data = request.get_cookie("secret_data") try: try: if "libwww-perl" in request.headers.get('User-Agent'): # Anti Padbuster simple response.set_header('Set-Cookie', 'secret_data=%s' % data) return "Attack detected." plain = uncipher(secret_data, KEY) data = json.loads(plain) print data return "Hello %s." % data['username'] except PaddingError: response.set_header('Set-Cookie', 'secret_data=%s' % data) return "Padding error." except: response.set_header('Set-Cookie', 'secret_data=%s' % data) return "Secret value error." else: response.set_header('Set-Cookie', 'secret_data=%s' % data) return '<a href="/">Enter website</a>'
def info(): ''' Return service information. ''' try: test_report_stamp = time.ctime(os.path.getctime(DOCTEST_OUTPUT)) except: test_report_stamp = '' try: with open('/tmp/dac_test_results') as fh: test_report = fh.read() except: test_report = '' resp = {'sums': mk_md5sums(), 'test_errors': test_errors(), 'test_report': test_report, 'test_report_stamp': test_report_stamp, 'stats': stats} response.set_header('Content-Type', 'application/json') return resp
def fetch_static(filename): response.set_header('Cache-Control', 'max-age=600') return static_file(filename, root='static')
def badge(number): response.set_header('Cache-Control', 'public, max-age={}'.format(SECONDS_TO_RESCRAPE)) response.set_header('Access-Control-Allow-Origin','*') response.content_type = "image/svg+xml;charset=utf-8" if number < 10: return template("First Timers-1-blue.svg", number=number) if number < 100: return template("First Timers-10-blue.svg", number=number) return template("First Timers-100-blue.svg", number=number)
def get_preview_image(): context = get_label_context(request) im = create_label_im(**context) return_format = request.query.get('return_format', 'png') if return_format == 'base64': import base64 response.set_header('Content-type', 'text/plain') return base64.b64encode(image_to_png_bytes(im)) else: response.set_header('Content-type', 'image/png') return image_to_png_bytes(im)
def predict(): ''' Get the current DAC prediction. ''' linker = dac.EntityLinker(debug=True, candidates=True) result = linker.link(request.query.url, request.query.ne.encode('utf-8')) result = result['linkedNEs'][0] response.set_header('Content-Type', 'application/json') return result
def get_image(tgtbox, boxes_rpc, width=648): box = _get_box_rpc(tgtbox, boxes_rpc) if box.lock_exists(): # must yield then return in a generator function yield template('error', errormsg="It looks like an experiment is currently running on this box. Image capture only works while the box is idle.") return try: try: box.start_img_stream(width) except OSError as e: # must yield then return in a generator function yield template('error', errormsg="Failed to start image stream capture.", exception=e) return # all set; carry on response.set_header('Content-type', 'multipart/x-mixed-replace; boundary=atlesframe') while True: if box.lock_exists(): return time.sleep(0.5) imgdata = box.get_image() if imgdata is None: return if len(imgdata) < 16: continue # ignore dud frames yield b"\n".join([b"--atlesframe", b"Content-Type: image/jpeg", b"", imgdata, b""]) finally: box.stop_img_stream()
def download_report(task_id): if not task_id.isdigit(): return HTTPError(code=404, output="The specified ID is invalid") report_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "reports", "report.html") if not os.path.exists(report_path): return HTTPError(code=404, output="Report not found") response.content_type = "text/html" response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_{0}.html".format(task_id)) return open(report_path, "rb").read()
def get_pcap(task_id): if not task_id.isdigit(): return HTTPError(code=404, output="The specified ID is invalid") pcap_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcap") if not os.path.exists(pcap_path): return HTTPError(code=404, output="PCAP not found") response.content_type = "application/vnd.tcpdump.pcap" response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_{0}.pcap".format(task_id)) return open(pcap_path, "rb").read()
def get_images(): image_path = request.query.get('path',None) if os.path.exists(image_path): data = open(image_path,'rb').read() response.set_header('Content-type', 'image/jpeg') return data else: HTTPResponse(status=404,body=json.dumps({'error' : 'image not found'})) #------------------ # MAIN #------------------
def error400(error): _add_cors_headers(response) response.set_header('Content-Type', 'application/json') return json.dumps({'detail': error.body})
def error404(error): _add_cors_headers(response) response.set_header('Content-Type', 'application/json') return json.dumps({'detail': 'Page not found'})
def create_routes(host, port): """ Define registry routes """ @route("/runtimes/", method=['OPTIONS', 'GET']) def get_registry(): """ Get data about rill runtime """ from rill.runtime import Runtime response.set_header('Access-Control-Allow-Origin', '*') response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS') response.set_header('Allow', 'GET, OPTIONS') response.set_header( 'Access-Control-Allow-Headers', 'Content-Type, Authorization' ) if request.method == 'OPTIONS': return 'GET,OPTIONS' response.content_type = 'application/json' runtime = Runtime() runtime_meta = runtime.get_runtime_meta() runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port) runtime_meta['protocol'] = 'websocket' runtime_meta['id'] = 'rill_' + urlparse(address).netloc runtime_meta['seen'] = str(datetime.now()) return json.dumps([runtime_meta])
def index(): ''' Return the entity linker result. ''' url = request.params.get('url') ne = request.params.get('ne') model = request.params.get('model') debug = request.params.get('debug') features = request.params.get('features') candidates = request.params.get('candidates') callback = request.params.get('callback') if not url: abort(400, 'No fitting argument ("url=...") given.') try: linker = dac.EntityLinker(model=model, debug=debug, features=features, candidates=candidates) result = linker.link(url, ne) except Exception as e: result = [] result['status'] = 'error' result['message'] = str(e) result['hostname'] = hostname if result['status'] == 'ok': result['linkedNEs'] = array_to_utf(result['linkedNEs']) result['hostname'] = hostname result = json.dumps(result, sort_keys=True) if callback: result = unicode(callback) + u'(' + result + u');' response.set_header('Content-Type', 'application/json') return result
def _recv_msg(self): """ Route for sending message to backend. """ # Get message data msg_data = json.loads(unquote(b64decode(req.params.data))) # Trigger correspond event callbacks = self._callback.get(msg_data["event"]) if callbacks: for callback in callbacks: callback(msg_data["data"]) # Message received res.set_header(b"content-type", b"application/json") return json.dumps({"status": "success"}) # Client side message subscription # (Used to push message to client)
def _msg_polling(self): """ Front-end message polling route. """ # Set content type res.set_header(b"content-type", b"text/event-stream") # Send response header yield "retry: 2000\n\n" # Polling loop while True: # Check message sending queue while self._msg_queue: current_msg = self._msg_queue.popleft() yield "data: "+b64encode(quote(json.dumps(current_msg)))+"\n\n" sleep(0) # Server side data
def _bknd_data(self): """ Return backend data (including configuration). """ res.set_header(b"content-type", b"application/json") return json.dumps(self.srv_data) # [ Static Routes ] # Index redirection
def _crawler_static(path): """ Serve Chrome crawler static files. """ # Fetch file content try: file_content = resource_string(_pkg_name, "chrome_crawler_frontend/"+path) except IOError: abort(404, "Not Found") # Guess MIME type from file name mime_type = guess_type(path)[0] if mime_type: res.set_header(b"content-type", mime_type) # Return file content return file_content # Custom static files
def create_app(engine): app = Bottle() @app.error() @app.error(404) def handle_error(error): if issubclass(type(error.exception), ApiException): response.status = error.exception.code else: response.status = error.status_code response.set_header('Content-type', 'application/json') resp = { 'type': type(error.exception).__name__, 'message': repr(error.exception) if error.exception else '', 'traceback': error.traceback, 'status_code': response.status } log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\ % (resp['type'], resp['message'], resp['status_code'], resp['traceback'])) return '%s %s' % (resp['status_code'], resp['message']) @app.route('/ping', method=['GET']) def ping(): return {'name': 'xFlow', 'version': '0.1' } @app.route('/publish', method=['POST']) def publish(): data = json.loads(request.body.read()) try: publish_schema.validate(data) except jsonschema.ValidationError as err: raise BadRequest(err) stream = data['stream'] event = json.dumps(data['event']) try: engine.publish(stream, event) except core.KinesisStreamDoesNotExist as ex: raise NotFoundException(str(ex)) return {} @app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET']) def track(workflow_id, execution_id): try: tracking_info = engine.track(workflow_id, execution_id) return tracking_info except (core.CloudWatchStreamDoesNotExist, core.WorkflowDoesNotExist, core.CloudWatchLogDoesNotExist) as ex: raise NotFoundException(str(ex)) raise Exception("Something went wrong!") return app