我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.g.get()。
def _get_config( value, config_name, default=None, required=True, message='CSRF is not configured.' ): """Find config value based on provided value, Flask config, and default value. :param value: already provided config value :param config_name: Flask ``config`` key :param default: default value if not provided or configured :param required: whether the value must not be ``None`` :param message: error message if required config is not found :raises KeyError: if required config is not found """ if value is None: value = current_app.config.get(config_name, default) if required and value is None: raise KeyError(message) return value
def _get_csrf_token(self): # find the ``csrf_token`` field in the subitted form # if the form had a prefix, the name will be # ``{prefix}-csrf_token`` field_name = current_app.config['WTF_CSRF_FIELD_NAME'] for key in request.form: if key.endswith(field_name): csrf_token = request.form[key] if csrf_token: return csrf_token for header_name in current_app.config['WTF_CSRF_HEADERS']: csrf_token = request.headers.get(header_name) if csrf_token: return csrf_token return None
def logging_levels(): """ Context manager to conditionally set logging levels. Supports setting per-request debug logging using the `X-Request-Debug` header. """ enabled = strtobool(request.headers.get("x-request-debug", "false")) level = None try: if enabled: level = getLogger().getEffectiveLevel() getLogger().setLevel(DEBUG) yield finally: if enabled: getLogger().setLevel(level)
def post_process_request_body(self, dct): if g.get("hide_body") or not self.request_body: return for name, new_name in g.get("show_request_fields", {}).items(): try: value = self.request_body.pop(name) self.request_body[new_name] = value except KeyError: pass for field in g.get("hide_request_fields", []): try: del self.request_body[field] except KeyError: pass dct.update( request_body=self.request_body, )
def post_process_response_body(self, dct): if g.get("hide_body") or not self.response_body: return for name, new_name in g.get("show_response_fields", {}).items(): try: value = self.response_body.pop(name) self.response_body[new_name] = value except KeyError: pass for field in g.get("hide_response_fields", []): try: del self.response_body[field] except KeyError: pass dct.update( response_body=self.response_body, )
def name_for_changeset(self): address = self.address n = self.name if not address: return self.name if isinstance(address, list): d = {a['type']: a['name'] for a in address} elif isinstance(address, dict): d = address if d.get('country_code') == 'us': state = d.get('state') if state and n != state: return n + ', ' + state country = d.get('country') if country and self.name != country: return '{} ({})'.format(self.name, country) return self.name
def close_changeset(osm_type, osm_id): Place.get_or_abort(osm_type, osm_id) osm_backend, auth = get_backend_and_auth() changeset_id = request.form['changeset_id'] update_count = request.form['update_count'] if really_save: osm_backend.request(osm_api_base + '/changeset/{}/close'.format(changeset_id), method='PUT', auth=auth, headers=user_agent_headers()) change = Changeset.query.get(changeset_id) change.update_count = update_count database.session.commit() # mail.announce_change(change) return Response('done', mimetype='text/plain')
def search_results(): q = request.args.get('q') or '' if not q: return render_template('results_page.html', results=[], q=q) m = re_qid.match(q.strip()) if m: return redirect(url_for('item_page', wikidata_id=m.group(1)[1:])) try: results = nominatim.lookup(q) except nominatim.SearchError: message = 'nominatim API search error' return render_template('error_page.html', message=message) update_search_results(results) for hit in results: add_hit_place_detail(hit) return render_template('results_page.html', results=results, q=q)
def criteria_page(): entity_types = matcher.load_entity_types() taginfo = get_taginfo(entity_types) for t in entity_types: t.setdefault('name', t['cats'][0].replace(' by country', '')) for tag in t['tags']: if '=' not in tag: continue image = taginfo.get(tag, {}).get('image') if image: t['image'] = image break entity_types.sort(key=lambda t: t['name'].lower()) cat_counts = {cat.name: cat.page_count for cat in Category.query} return render_template('criteria.html', entity_types=entity_types, cat_counts=cat_counts, taginfo=taginfo)
def saved_places(): abort(404) if 'filter' in request.args: arg_filter = request.args['filter'].strip().replace(' ', '_') if arg_filter: return redirect(url_for('saved_with_filter', name_filter=arg_filter)) else: return redirect(url_for('saved_places')) sort = request.args.get('sort') or 'name' name_filter = g.get('filter') or None if name_filter: place_tbody = render_template('place_tbody.html', existing=get_existing(sort, name_filter)) else: place_tbody = get_place_tbody(sort) return render_template('saved.html', place_tbody=place_tbody, sort_link=sort_link)
def tracer_decorator(span_name, pay_load=None): def _decorator(func): @wraps(func) def create_span(*args, **kwargs): span = g.get("tracer_span") if not span: new_span = opentracing.tracer.start_span(func.__name__) else: new_span = opentracing.tracer.start_span(func.__name__, child_of=span) new_span.log_event(span_name, payload=pay_load) g.tracer_span = new_span result = func(*args, **kwargs) if span: g.tracer_span = span new_span.finish() return result return create_span return _decorator
def make_error_page(app, name, code, sentry=None, data=None, exception=None): ''' creates the error page dictionary for web errors ''' shortname = name.lower().replace(' ', '_') error = {} error['title'] = 'Marvin | {0}'.format(name) error['page'] = request.url error['event_id'] = g.get('sentry_event_id', None) error['data'] = data error['name'] = name error['code'] = code error['message'] = exception.description if exception and hasattr(exception, 'description') else None if app.config['USE_SENTRY'] and sentry: error['public_dsn'] = sentry.client.get_public_dsn('https') app.logger.error('{0} Exception {1}'.format(name, error)) return render_template('errors/{0}.html'.format(shortname), **error), code # ---------------- # Error Handling # ----------------
def _execute_for_all_tables(self, app, bind, operation, skip_tables=False): app = self.get_app(app) if bind == '__all__': binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ()) elif isinstance(bind, string_types) or bind is None: binds = [bind] else: binds = bind for bind in binds: extra = {} if not skip_tables: tables = self.get_tables_for_bind(bind) extra['tables'] = tables op = getattr(self.Model.metadata, operation) op(bind=self.get_engine(app, bind), **extra)
def after_request(param): response_param = {'charset': param.charset, 'content_length': param.content_length, 'content_type': param.content_type, 'content_encoding': param.content_encoding, 'mimetype': param.mimetype, 'response': g.result if hasattr(g, 'result') else None, 'status': param.status, 'status_code': param.status_code} g.response_time = datetime.now() time_consuming = str(g.response_time - g.request_time) log_info = {'api_method': g.get('api_method'), 'api_version': g.get('api_version'), 'request_param': g.get('request_param'), 'request_form': g.get('request_form'), 'querystring': g.get('request_param', {}).get('query_string'), 'request_json': g.get('request_json'), 'response_param': response_param, 'request_raw_data': g.request_raw_data, 'request_time': g.get('request_time').strftime(current_app.config['APIZEN_DATETIME_FMT']), 'response_time': g.get('response_time').strftime(current_app.config['APIZEN_DATETIME_FMT']), 'time_consuming': time_consuming} if param.status_code >= 400: from app.tasks import send_mail_async # send_mail_async.delay(current_app.config['ADMIN_EMAIL'], 'Web Api Request Error', 'api_error', **log_info) current_app.logger.error(log_info) else: current_app.logger.debug(log_info) return param
def new_revision(self, *fields): """Save a new revision of the document""" # Ensure this document is a draft if not self._id: assert g.get('draft'), \ 'Only draft documents can be assigned new revisions' else: with self.draft_context(): assert self.count(Q._id == self._id) == 1, \ 'Only draft documents can be assigned new revisions' # Set the revision if len(fields) > 0: fields.append('revision') self.revision = datetime.now() # Update the document self.upsert(*fields)
def get_collection(cls): """Return a reference to the database collection for the class""" # By default the collection returned will be the published collection, # however if the `draft` flag has been set against the global context # (e.g `g`) then the collection returned will contain draft documents. if g.get('draft'): return getattr( cls.get_db(), '{collection}_draft'.format(collection=cls._collection) ) return getattr(cls.get_db(), cls._collection) # Contexts
def generate_csrf(secret_key=None, token_key=None): """Generate a CSRF token. The token is cached for a request, so multiple calls to this function will generate the same token. During testing, it might be useful to access the signed token in ``g.csrf_token`` and the raw token in ``session['csrf_token']``. :param secret_key: Used to securely sign the token. Default is ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``. :param token_key: Key where token is stored in session for comparision. Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``. """ secret_key = _get_config( secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key, message='A secret key is required to use CSRF.' ) field_name = _get_config( token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token', message='A field name is required to use CSRF.' ) if field_name not in g: if field_name not in session: session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest() s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token') setattr(g, field_name, s.dumps(session[field_name])) return g.get(field_name)
def validate_csrf_token(self, form, field): if g.get('csrf_valid', False): # already validated by CSRFProtect return try: validate_csrf( field.data, self.meta.csrf_secret, self.meta.csrf_time_limit, self.meta.csrf_field_name ) except ValidationError as e: logger.info(e.args[0]) raise
def should_skip_logging(func): """ Should we skip logging for this handler? """ disabled = strtobool(request.headers.get("x-request-nolog", "false")) return disabled or getattr(func, SKIP_LOGGING, False)
def get_address_key(self, key): if isinstance(self.address, dict): return self.address.get(key) for line in self.address or []: if line['type'] == key: return line['name']
def update_from_nominatim(self, hit): keys = ('display_name', 'place_rank', 'category', 'type', 'icon', 'extratags', 'namedetails') for n in keys: setattr(self, n, hit.get(n)) self.address = [dict(name=n, type=t) for t, n in hit['address'].items()]
def name_for_change_comment(self): n = self.name if self.address: if isinstance(self.address, list): address = {a['type']: a['name'] for a in self.address} elif isinstance(self.address, dict): address = self.address if address.get('country_code') == 'us': state = address.get('state') if state and n != state: return n + ', ' + state return 'the ' + n if ' of ' in n else n
def get_or_add_place(cls, hit): place = cls.query.filter_by(osm_type=hit['osm_type'], osm_id=hit['osm_id']).one_or_none() if place and place.place_id != hit['place_id']: place.update_from_nominatim(hit) elif not place: place = Place.query.get(hit['place_id']) if place: place.update_from_nominatim(hit) else: place = cls.from_nominatim(hit) session.add(place) session.commit() return place
def candidates_url(self, **kwargs): if g.get('filter'): kwargs['name_filter'] = g.filter endpoint = 'candidates_with_filter' else: endpoint = 'candidates' return url_for(endpoint, osm_type=self.osm_type, osm_id=self.osm_id, **kwargs)
def filter_urls(): name_filter = g.get('filter') try: if name_filter: url = url_for('saved_with_filter', name_filter=name_filter.replace(' ', '_')) else: url = url_for('saved_places') except RuntimeError: return {} # maybe we don't care return dict(url_for_saved=url)
def logout(): next_url = request.args.get('next') or url_for('index') logout_user() flash('you are logged out') return redirect(next_url)
def export_osm(osm_type, osm_id, name): place = Place.get_or_abort(osm_type, osm_id) items = place.items_with_candidates() items = list(matcher.filter_candidates_more(items, bad=get_bad(items))) if not any('candidate' in match for _, match in items): abort(404) items = [(item, match['candidate']) for item, match in items if 'candidate' in match] lookup = {} for item, osm in items: lookup[(osm.osm_type, osm.osm_id)] = item filename = cache_filename('{}_{}_overpass_export.xml'.format(osm_type, osm_id)) if os.path.exists(filename): overpass_xml = open(filename, 'rb').read() else: overpass_xml = overpass.items_as_xml(items) with open(filename, 'wb') as f: f.write(overpass_xml) root = etree.fromstring(overpass_xml) for e in root: if e.tag not in {'way', 'node', 'relation'}: continue for f in 'uid', 'user', 'timestamp', 'changeset': del e.attrib[f] pair = (e.tag, int(e.attrib['id'])) item = lookup.get(pair) if not item: continue e.attrib['action'] = 'modify' tag = etree.Element('tag', k='wikidata', v=item.qid) e.append(tag) xml = etree.tostring(root, pretty_print=True) return Response(xml, mimetype='text/xml')
def add_tags(osm_type, osm_id): place = Place.get_or_abort(osm_type, osm_id) include = request.form.getlist('include') items = Item.query.filter(Item.item_id.in_([i[1:] for i in include])).all() table = [(item, match['candidate']) for item, match in matcher.filter_candidates_more(items, bad=get_bad(items)) if 'candidate' in match] items = [{'row_id': '{:s}-{:s}-{:d}'.format(i.qid, c.osm_type, c.osm_id), 'qid': i.qid, 'osm_type': c.osm_type, 'osm_id': c.osm_id, 'description': '{} {}: adding wikidata={}'.format(c.osm_type, c.osm_id, i.qid), 'post_tag_url': url_for('.post_tag', item_id=i.item_id, osm_id=c.osm_id, osm_type=c.osm_type)} for i, c in table] if False and request.form.get('confirm') == 'yes': update_count = do_add_tags(place, table) flash('{:,d} wikidata tags added to OpenStreetMap'.format(update_count)) return redirect(place.candidates_url()) return render_template('add_tags.html', place=place, osm_id=osm_id, items=items, table=table)
def index(): q = request.args.get('q') if q: return redirect(url_for('search_results', q=q)) if 'filter' in request.args: arg_filter = request.args['filter'].strip().replace(' ', '_') if arg_filter: return redirect(url_for('saved_with_filter', name_filter=arg_filter)) else: return redirect(url_for('saved_places')) return render_template('index.html', place_cards=get_place_cards())
def tag_list(): q = get_tag_list(request.args.get('sort')) return render_template('tag_list.html', q=q)
def item_page(wikidata_id): item = Item.query.get(wikidata_id) try: return build_item_page(wikidata_id, item) except wikidata.QueryError: return render_template('error_page.html', message="query.wikidata.org isn't working")
def delete_place(place_id): place = Place.query.get(place_id) place.clean_up() flash('{} deleted'.format(place.display_name)) to_next = request.args.get('next', 'space') return redirect(url_for(to_next))
def current_span(): return g.get("tracer_span", None)
def form_is_valid(): return not g.get('hasfailures', False)