我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用flask.request.host()。
def protect(self): if request.method not in self._app.config['WTF_CSRF_METHODS']: return if not validate_csrf(self._get_csrf_token()): reason = 'CSRF token missing or incorrect.' return self._error_response(reason) if request.is_secure and self._app.config['WTF_CSRF_SSL_STRICT']: if not request.referrer: reason = 'Referrer checking failed - no Referrer.' return self._error_response(reason) good_referrer = 'https://%s/' % request.host if not same_origin(request.referrer, good_referrer): reason = 'Referrer checking failed - origin does not match.' return self._error_response(reason) request.csrf_valid = True # mark this request is csrf valid
def protect(self): if request.method not in current_app.config['WTF_CSRF_METHODS']: return try: validate_csrf(self._get_csrf_token()) except ValidationError as e: logger.info(e.args[0]) self._error_response(e.args[0]) if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']: if not request.referrer: self._error_response('The referrer header is missing.') good_referrer = 'https://{0}/'.format(request.host) if not same_origin(request.referrer, good_referrer): self._error_response('The referrer does not match the host.') g.csrf_valid = True # mark this request as CSRF valid
def __init__(self, **kwargs): self.generic_credentials = { 'key': os.getenv('VULTR_API_KEY', '') } for host in [request.host, os.getenv('APP_NAME', '') + '.nanoapp.io']: try: ip = socket.gethostbyname(host) or None except socket.gaierror: ip = None if ip: break self.auth_instructions += (' (If you need to be more specific about ' 'the access controls, you can use %s/32, but keep in mind that ' 'this address may change at any point in the future, and you will ' 'need to update your Vultr account accordingly to continue ' 'deploying.)') % (ip) if ip else '' # Internal overrides for provider retrieval
def index(): nonce = ''.join(random.sample( string.lowercase+string.digits, 16 )) r = Response(render_template("otm.jinja", nonce=nonce )) r.headers['Content-Security-Policy'] = ';'.join(( "default-src 'none'", "style-src 'nonce-%s'" % nonce, "script-src 'nonce-%s'" % nonce, "connect-src %s://%s/ws" % ( "wss" if request.is_secure else "ws", request.host, ), )) r.headers['X-Frame-Options'] = 'DENY' return r
def dl_skelenox(): """ Generate a Zip file wich contains both the Skelenox script and the associated config file. """ try: ip_addr, _ = request.host.split(":") except ValueError: ip_addr = request.host zipout = io.BytesIO() with ZipFile(zipout, "w") as myzip: myzip.write("skelenox.py") skel_config = {} skel_config["username"] = g.user.nickname skel_config["edit_flag"] = True skel_config["initial_sync"] = True skel_config["poli_server"] = ip_addr skel_config["poli_port"] = app.config['SERVER_PORT'] skel_config["poli_remote_path"] = app.config['API_PATH'] + "/" skel_config["debug_http"] = app.config['HTTP_DEBUG'] skel_config["poli_apikey"] = g.user.api_key skel_config["save_timeout"] = 10 * 60 skel_config["sync_frequency"] = 1.0 * 100 skel_config["debug_level"] = "info" skel_config["notepad_font_name"] = "Courier New" skel_config["notepad_font_size"] = 9 skel_json = json.dumps(skel_config, sort_keys=True, indent=4) myzip.writestr("skelsettings.json", skel_json) myzip.close() response = make_response(zipout.getvalue()) response.headers["Content-type"] = "application/octet-stream" response.headers[ "Content-Disposition"] = "attachment; filename=skelenox.zip" return response
def logout(): ''' Manually override the logout URL to avoid completely signing the user out of all Google accounts ''' if os.getenv('SERVER_SOFTWARE', '').startswith('Google App Engine/'): return redirect('_ah/logout?continue=https://' + request.host + '/') return redirect(users.create_logout_url('/'))
def pushit(path): # no path; return a list of avaliable archives if path == '': #resp = jsonify(listArchives_server(handlers)) #resp.status_code = 200 return render_template('index.html') #return resp # get request with path elif (path == 'api'): resp = jsonify(listArchives_server(handlers)) resp.status_code = 200 return resp elif (path == "ajax-loader.gif"): return render_template('ajax-loader.gif') else: try: # get the args passed to push function like API KEY if provided PUSH_ARGS = {} for k in request.args.keys(): PUSH_ARGS[k] = request.args[k] s = str(path).split('/', 1) arc_id = s[0] URI = s[1] if 'herokuapp.com' in request.host: PUSH_ARGS['from_heroku'] = True # To push into archives resp = {"results": push(URI, arc_id, PUSH_ARGS)} if len(resp["results"]) == 0: return bad_request() else: # what to return resp = jsonify(resp) resp.status_code = 200 return resp except Exception as e: pass return bad_request()
def start(port=SERVER_PORT, host=SERVER_IP): global SERVER_PORT global SERVER_IP SERVER_PORT = port SERVER_IP = host app.run( host=host, port=port, threaded=True, debug=True, use_reloader=False)
def prepare_auth_request(request): url_data = urlparse(request.url) return { "https": 'on', 'http_host': request.host, 'server_port': url_data.port, 'script_name': request.path, 'get_data': request.args.copy(), 'post_data': request.form.copy(), # Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144 # 'lowercase_urlencoding': True, 'query_string': request.query_string }
def index(): if request.method == 'POST': uploaded_file = request.files['file'] w = request.form.get('w') h = request.form.get('h') if not uploaded_file: return abort(400) rs = create(uploaded_file, width=w, height=h) if rs['r']: return rs['error'] paste_file = rs['paste_file'] return jsonify({ 'url_d': paste_file.url_d % request.host, 'url_i': paste_file.url_i % request.host, 'url_s': paste_file.url_s % request.host, 'url_p': paste_file.url_p % request.host, 'filename': paste_file.filename, 'size': humanize_bytes(paste_file.size), 'uploadtime': paste_file.uploadtime, 'type': paste_file.type, 'quoteurl': paste_file.quoteurl.replace('%25s', request.host) }) return render_template('index.html', **locals())
def get_url(self, subtype, is_symlink=False): hash_or_link = self.symlink if is_symlink else self.filehash return 'http://{host}/{subtype}/{hash_or_link}'.format( subtype=subtype, host=request.host, hash_or_link=hash_or_link)
def anticsrf(f): @wraps(f) def wrapper(*args, **kwargs): try: if request.referrer and request.referrer.replace('http://', '').split('/')[0] == request.host: return f(*args, **kwargs) else: return redirect(url_for('NotFound')) except Exception, e: print e return redirect(url_for('Error')) return wrapper
def prepare_flask_request(request): url_data = urlparse(request.url) return { 'https': 'on', 'http_host': request.host, 'script_name': request.path, 'get_data': request.args.copy(), 'post_data': request.form.copy() }
def make_url_dfiles_list(dfiles_list): cwd = os.getcwd() # recast local fs image paths as server paths using hostname from request for dfile_ifo in dfiles_list: dfile_ifo['orig_fn'] = 'http://' + request.host + dfile_ifo['orig_fn'].replace(cwd, '') dfile_ifo['thumb_fn'] = 'http://' + request.host + dfile_ifo['thumb_fn'].replace(cwd, '') dfile_ifo['clean_fn'] = 'http://' + request.host + dfile_ifo['clean_fn'].replace(cwd, '') return dfiles_list
def sms_alert(gif_url, user_id): # Get User Phone Number current_user = User.query.filter_by(id=user_id).first() # Create Twilio Message message = client.sms.messages.create(to=current_user.phone, from_=twilio_caller, body="Intruder! Visit\n" + request.host + " \nfor more information", media_url=['https://demo.twilio.com/owl.png', 'https://demo.twilio.com/logo.png'])
def get_spec(app): """Build API swagger scecifiction.""" swag = swagger(app) swag["info"]["version"] = "0.1" swag["info"]["title"] = "ORCID HUB API" # swag["basePath"] = "/api/v0.1" swag["host"] = request.host # "dev.orcidhub.org.nz" swag["consumes"] = [ "application/json", ] swag["produces"] = [ "application/json", ] swag["schemes"] = [ "https", ] swag["securityDefinitions"] = { "application": { "type": "oauth2", "tokenUrl": url_for("access_token", _external=True), "flow": "application", "scopes": { "write": "allows modifying resources", "read": "allows reading resources", } } } swag["security"] = [ { "application": [ "read", "write", ] }, ] return swag
def init_app(self, app): @app.before_first_request def make_logger(): handler = RotatingFileHandler('server_log.log', maxBytes=100000, backupCount=5) handler.setFormatter(Formatter("[%(asctime)s] %(levelname)s - %(message)s")) current_app.logger.addHandler(handler) current_app.logger.setLevel(INFO) current_app.logger.info('------ Logger Initialized ------') @app.before_request def before_request(): current_app.logger.info('Requested from {0} [ {1} {2} ]'.format(request.host, request.method, request.url)) current_app.logger.info('Request values : {0}'.format(request.values)) @app.after_request def after_request(response): current_app.logger.info('Respond : {0}'.format(response.status)) response.headers['X-Powered-By'] = app.config['SERVICE_NAME'] return response @app.teardown_appcontext def teardown_appcontext(exception): if not exception: current_app.logger.info('Teardown appcontext successfully.')
def __init__(self, app_name, listen='0.0.0.0', port=8181): """Start a Flask+SocketIO server. Args: app_name(string): String representing a App Name listen (string): host name used by api server instance port (int): Port number used by api server instance """ dirname = os.path.dirname(os.path.abspath(__file__)) self.flask_dir = os.path.join(dirname, '../web-ui') self.log = logging.getLogger('api_server') self.listen = listen self.port = port self.app = Flask(app_name, root_path=self.flask_dir, static_folder="dist", static_url_path="/dist") self.server = SocketIO(self.app, async_mode='threading') self._enable_websocket_rooms() # ENABLE CROSS ORIGIN RESOURCE SHARING CORS(self.app) # Disable trailing slash self.app.url_map.strict_slashes = False # Update web-ui if necessary self.update_web_ui(force=False)
def shutdown_api(self): """Handle shutdown requests received by Api Server. This method must be called by kytos using the method stop_api_server, otherwise this request will be ignored. """ allowed_host = ['127.0.0.1:'+str(self.port), 'localhost:'+str(self.port)] if request.host not in allowed_host: return "", 403 self.server.stop() return 'Server shutting down...', 200
def before_request(): # ? ??? ???? ?? # ???? request ??? ??? ? ?? # ?? ??? ???? ?? ? ?? print('Before request :', request.host)
def before_request(): global hostname, master_ip, master_port, run_ids, c_type hostname = request.host run_ids = [] try: min_run_id = int(request.form.get('min_run_id')) max_run_id = int(request.form.get('max_run_id')) for run_id in range(min_run_id, max_run_id + 1): run_ids.append(run_id) except: pass master = request.form.get('master') if master is not None: if ":" in master: (master_ip, master_port) = master.split(":") else: master_ip = master c_type = request.form.get('type') ##################### # Resource End Points ##################### ## # System Memory
def get(self): esstore = current_app.extensions['es_access_store'] mpstore = current_app.extensions['mp_access_store'] args = self.parser.parse_args() event = args['event'][:120] auth_token=request.headers.get('Auth-Token') ip_resolver = current_app.config['IP_RESOLVER'] ip = RateLimiter.get_remote_addr() ip_net = IPNetwork(ip) resolved_org = ip_resolver['default'] for net, org in ip_resolver.items(): if isinstance(net, (IPv4Network, IPv6Network)): if net.overlaps(ip_net): resolved_org = org break data = dict(ip=ip, org=resolved_org, host=request.host, timestamp=datetime.now(), event=event) if auth_token: payload = TokenAuthentication.get_payload_from_token(auth_token) data['app_name'] = payload['app_name'] # esstore.store_event(data) mpstore.store_event(data) data['timestamp']= str(data['timestamp']) return CTTVResponse.OK(SimpleResult(None, data=data))
def init_app(self, app): app.jinja_env.globals['csrf_token'] = generate_csrf strict = app.config.get('WTF_CSRF_SSL_STRICT', True) csrf_enabled = app.config.get('WTF_CSRF_ENABLED', True) @app.before_request def _csrf_protect(): # many things come from django.middleware.csrf if not csrf_enabled: return if request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'): return if self._exempt_views: if not request.endpoint: return view = app.view_functions.get(request.endpoint) if not view: return dest = '%s.%s' % (view.__module__, view.__name__) if dest in self._exempt_views: return csrf_token = None if request.method in ('POST', 'PUT', 'PATCH'): # find the ``csrf_token`` field in the subitted form # if the form had a prefix, the name will be ``{prefix}-csrf_token`` for key in request.form: if key.endswith('csrf_token'): csrf_token = request.form[key] if not csrf_token: # You can get csrf token from header # The header name is the same as Django csrf_token = request.headers.get('X-CSRFToken') if not csrf_token: # The header name is the same as Rails csrf_token = request.headers.get('X-CSRF-Token') if not validate_csrf(csrf_token): reason = 'CSRF token missing or incorrect.' return self._error_response(reason) if request.is_secure and strict: if not request.referrer: reason = 'Referrer checking failed - no Referrer.' return self._error_response(reason) good_referrer = 'https://%s/' % request.host if not same_origin(request.referrer, good_referrer): reason = 'Referrer checking failed - origin not match.' return self._error_response(reason) request.csrf_valid = True # mark this request is csrf valid
def AddPlugin(): result = 'fail' if request.referrer and request.referrer.replace('http://', '').split('/')[0] == request.host: f = request.files['file'] if f: fname = secure_filename(f.filename) if fname.split('.')[-1] == 'py': path = file_path + fname if os.path.exists(file_path + fname): fname = fname.split('.')[0] + '_' + datetime.now().strftime("%Y%m%d%H%M%S") + '.py' path = file_path + fname f.save(path) if os.path.exists(path): file_name = fname.split('.')[0] module = __import__(file_name) mark_json = module.get_plugin_info() mark_json['filename'] = file_name mark_json['add_time'] = datetime.now() mark_json['count'] = 0 if 'source' not in mark_json: mark_json['source'] = 0 insert_result = Mongo.coll['Plugin'].insert(mark_json) if insert_result: result = 'success' else: name = request.form.get('name', '') info = request.form.get('info', '') author = request.form.get('author', '') level = request.form.get('level', '') type = request.form.get('vultype', '') keyword = request.form.get('keyword', '') pluginurl = request.form.get('pluginurl', '') methodurl = request.form.get('methodurl', '') pdata = request.form.get('pdata', '') analyzing = request.form.get('analyzing', '') analyzingdata = request.form.get('analyzingdata', '') tag = request.form.get('tag', '') try: query = {'name': name, 'info': info, 'level': level, 'type': type, 'author': author, 'url': pluginurl, 'keyword': keyword, 'source': 0} query['plugin'] = {'method': methodurl.split(' ', 1)[0], 'url': methodurl.split(' ', 1)[1], 'analyzing': analyzing, 'analyzingdata': analyzingdata, 'data': pdata, 'tag': tag} file_name = secure_filename(name) + '_' + datetime.now().strftime("%Y%m%d%H%M%S") + ".json" with open(file_path + file_name, 'wb') as wt: wt.writelines(json.dumps(query)) query.pop('plugin') query['add_time'] = datetime.now() query['count'] = 0 query['filename'] = file_name insert_result = Mongo.coll['Plugin'].insert(query) if insert_result: result = 'success' except: pass return result # ??????