我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.session.get()。
def get(self): if request.cookies.get('save_id'): resp = make_response(redirect(url_for('.exit'))) resp.set_cookie('user_name', expires=0) resp.set_cookie('login_time', expires=0) resp.set_cookie('save_id', expires=0) return resp if session.get('name'): session.pop('name') if session.get('show_name'): session.pop('show_name') if session.get('user_id'): session.pop('user_id') return redirect(url_for('.login')) # ?config.json ???? is_register ?false??????? ??????????????
def login(): """ Login page. """ g.selected_tab = "login" error = None if request.method == "POST": if request.form["username"] != app.config["USERNAME"]: error = "Invalid login information" elif request.form["password"] != app.config["PASSWORD"]: error = "Invalid login information" else: session["logged_in"] = True flash("You were logged in") if session.get("project"): return redirect(url_for("home")) return redirect(url_for("projects")) return render_template("login.html", error=error)
def getAuthorizedFiles(fileConfigs, reportObj, report_name, userDirectory, fnct=None, ajax=False): ALIAS, DISK_NAME = 0, 1 SQL_CONFIG = os.path.join(current_app.config['ROOT_PATH'], config.ARES_SQLITE_FILES_LOCATION) sqlFileDict = {'data': 'get_file_auth.sql', 'static': 'static_file_map.sql'} fileNameToParser = {} for fileConfig in fileConfigs: queryFileAuthPrm = {'team': session['TEAM'], 'file_cod': fileConfig['filename'], 'username': current_user.email, 'type': fileConfig.get('folder')} files = executeSelectQuery(os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_LOCATION, report_name, 'db', 'admin.db'), open(os.path.join(SQL_CONFIG, sqlFileDict.get(fileConfig.get('folder')))).read(), params=queryFileAuthPrm) for file in files: if fileConfig.get('parser', None): reportObj.files[file[DISK_NAME]] = fileConfig['parser'](open(os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME]))) elif fileConfig.get('type') == 'pandas': reportObj.files[file[DISK_NAME]] = os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME]) else: reportObj.files[file[DISK_NAME]] = open(os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME])) if not ajax: fileNameToParser[file[DISK_NAME]] = "%s.%s" % (fileConfig['parser'].__module__.split(".")[-1], fileConfig['parser'].__name__) if fnct == 'params' and not ajax: reportObj.fileMap.setdefault(file[ALIAS], []).append(file[DISK_NAME]) return fileNameToParser
def index(): form = NameForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.name.data).first() if user is None: user = User(username=form.name.data) db.session.add(user) session['known'] = False if app.config['FLASKY_ADMIN']: send_email(app.config['FLASKY_ADMIN'], 'New User', 'mail/new_user', user=user) else: session['known'] = True session['name'] = form.name.data from.name.data = '' return redirect(url_for('index')) return render_template('index.html', form=form, name=session.get('name'), known=session.get('known', False))
def login(): db = UserDb(app.config['LOCAL_DB']) form = request.form user = form.get('user') pwd = form.get('pwd') password = db.login(user) del db if pwd == password: # ?????? session.permanent = True # session???? app.permanent_session_lifetime = timedelta(minutes=30) session.update(dict(user=user)) return render_template('index.html') elif password is None: return render_template('login.html', info="??????!") else: return render_template('login.html', info="?????!")
def run_selected_case(): # return jsonify(dict(name='selenium')) data = request.get_json() start = handle.now_str() # ???mongo??case?? db = MongoClient() case_list = db.get_case_by_name(data.get('case_name')) obj = apiFunc() rt = obj.run_tests(case_list, app.config['THREAD_NUM']) report = obj.writeReport(rt) html = render_template('testResult.html',failNum=rt['failed_num'], ignored=rt['ignored'], successNum=rt['success_num'], total=rt['total'], start=start, end=handle.now_str(), cost="{:.2}?".format(handle.delta(start, handle.now_str())), fileName=report) return jsonify(dict(html=html))
def authorize_view(self): """Flask view that starts the authorization flow. Starts flow by redirecting the user to the OAuth2 provider. """ args = request.args.to_dict() # Scopes will be passed as mutliple args, and to_dict() will only # return one. So, we use getlist() to get all of the scopes. args['scopes'] = request.args.getlist('scopes') return_url = args.pop('return_url', None) if return_url is None: return_url = request.referrer or '/' flow = self._make_flow(return_url=return_url, **args) auth_url = flow.step1_get_authorize_url() return redirect(auth_url)
def post(self): check_user = check_login() if check_user is None: return jsonify({'status': 'error', 'msg': 'no authority'}) if check_user == -1: return jsonify({'status': 'error', 'msg': 'user is valid'}) user_id = session.get('user_id') counts = db.session.query(Message).filter(Message.to_id == user_id).count() page = int(request.form.get('page', 0)) nums = (page - 1 if page >= 0 else 0) * 5 msgs = db.session.query(Message).filter(Message.to_id == user_id).order_by(Message.id.desc())[nums: nums + 5] all_page = int(counts / 5) + (1 if counts % 5 != 0 else 0) return jsonify({'status': 'ok', 'data': [msg.to_json() for msg in msgs], 'len': len(msgs), 'now': page, 'all': all_page})
def oauth_authorized(): # pragma: no cover """Authorize facebook login.""" resp = facebook.oauth.authorized_response() next_url = request.args.get('next') or url_for('home.home') if resp is None: flash(u'You denied the request to sign in.', 'error') flash(u'Reason: ' + request.args['error_reason'] + ' ' + request.args['error_description'], 'error') return redirect(next_url) if isinstance(resp, OAuthException): flash('Access denied: %s' % resp.message) current_app.logger.error(resp) return redirect(next_url) # We have to store the oauth_token in the session to get the USER fields access_token = resp['access_token'] session['oauth_token'] = (resp['access_token'], '') user_data = facebook.oauth.get('/me').data user = manage_user(access_token, user_data) return manage_user_login(user, user_data, next_url)
def manage_user_login(user, user_data, next_url): """Manage user login.""" if user is None: # Give a hint for the user user = user_repo.get_by(email_addr=user_data.get('email')) if user is not None: msg, method = get_user_signup_method(user) flash(msg, 'info') if method == 'local': return redirect(url_for('account.forgot_password')) else: return redirect(url_for('account.signin')) else: return redirect(url_for('account.signin')) else: login_user(user, remember=True) flash("Welcome back %s" % user.fullname, 'success') request_email = (user.email_addr == user.name) if request_email: flash("Please update your e-mail address in your profile page") return redirect(url_for('account.update_profile', name=user.name)) if (not request_email and user.newsletter_prompted is False and newsletter.is_initialized()): return redirect(url_for('account.newsletter_subscribe', next=next_url)) return redirect(next_url)
def redirectAll(session, now): global game if (not pdb.exist(session.get('name'))): goto = 'index' else: pl = pdb.players[session['name']] if (game.state == game.GST_ROOM_CLOSED): game = Game() if (game.checkPlayer(pl.username)): goto = 'room_play' else: goto = 'lobby' if goto == now: return None else: return goto # # API for Host #
def test_real(): global game n = 8 cfg = GameConfig(2, 2, True, True, True, True, 'all') # create players pdb.addPlayer('asdf', 'asdf') pdb.addPlayer('???', 'ywl'); pdb.addPlayer('???', 'dsw') pdb.addPlayer('??', 'xr') ; pdb.addPlayer('??', 'ls') pdb.addPlayer('???', 'usc'); pdb.addPlayer('??', 'ay') pdb.addPlayer('?', 'h') # start game game = Game() game.setConfig(cfg) game.state = Game.GST_WAIT_JOIN game.setHost(pdb.get('asdf')) game.addPlayer(pdb.get('???')); game.addPlayer(pdb.get('???')) game.addPlayer(pdb.get('??')) ; game.addPlayer(pdb.get('??')) game.addPlayer(pdb.get('???')); game.addPlayer(pdb.get('??')) game.addPlayer(pdb.get('?')) # goto host session['name'] = 'asdf' return redirect(url_for('room_play'))
def get_oauth_token(): token = session.get('rc_token') if time() > token['expires_at']: data = { 'grant_type': 'refresh_token', 'client_id': rc.consumer_key, 'client_secret': rc.consumer_secret, 'redirect_uri': 'ietf:wg:oauth:2.0:oob', 'refresh_token': token['refresh_token'] } resp = requests.post('https://www.recurse.com/oauth/token', data=data) data = resp.json() session['rc_token'] = { 'access_token': data['access_token'], 'refresh_token': data['refresh_token'], 'expires_at': data['expires_in'] + time() - 600 } return (data['access_token'], '') else: return (token['access_token'], '')
def logout(): """Route decorator destroys flask session and redirects to auth0 to destroy auth0 session. Ending page is mozilla signout.html.""" logger.info("User called logout route.") if os.environ.get('ENVIRONMENT') == 'Production': proto = "https" else: proto = "http" return_url = "{proto}://{server_name}/signout.html".format( proto=proto, server_name=app.config['SERVER_NAME'] ) logout_url = "https://{auth0_domain}/v2/logout?returnTo={return_url}".format( auth0_domain=oidc_config.OIDC_DOMAIN, return_url=return_url ) return redirect(logout_url, code=302)
def task_status(task_id): result = refresh_chapters_task.AsyncResult(task_id) if result.state == 'PENDING': response = { 'state': result.state, 'progress': 0, } elif result.state != 'FAILURE': response = { 'state': result.state, 'progress': result.info.get('progress', 0), } if result.state == 'SUCCESS': session.pop('task_id') if 'result' in result.info: response['result'] = result.info['result'] else: # something went wrong in the background job session.pop('task_id') response = { 'state': result.state, 'progress': 0, 'status': str(result.info), # this is the exception raised } return jsonify(response)
def get_data(data): metrics = {} report_data = {} temp = data.get("results") or data.get("report") or data.get("details") if type(temp) is list: length = len(temp) if length == 1: details = temp[0].get('details') metrics = details.get('metrics') report_data = details.get('report_data') return "single", metrics, report_data, temp else: return "multi", temp else: metrics = temp.get('metrics') report_data = temp.get('report_data') return "single", metrics, report_data, temp
def authenticated(fn): """Mark a route as requiring authentication.""" @wraps(fn) def decorated_function(*args, **kwargs): if not session.get('is_authenticated'): return redirect(url_for('login', next=request.url)) if request.path == '/logout': return fn(*args, **kwargs) if (not session.get('name') or not session.get('email') or not session.get('institution')) and request.path != '/profile': return redirect(url_for('profile', next=request.url)) return fn(*args, **kwargs) return decorated_function
def gdisconnect(): # Only disconnect a connected user. credentials = login_session.get('credentials') if credentials is None: response = make_response( json.dumps('Current user not connected.'), 401) response.headers['Content-Type'] = 'application/json' return response access_token = credentials.access_token url = 'https://accounts.google.com/o/oauth2/revoke?token=%s' % access_token h = httplib2.Http() result = h.request(url, 'GET')[0] if result['status'] != '200': # For whatever reason, the given token was invalid. response = make_response( json.dumps('Failed to revoke token for given user.', 400)) response.headers['Content-Type'] = 'application/json' return response # Disconnect based on provider
def strip_subfolder(url): """ Strip off the subfolder if it exists so we always use the exact same share url for saving counts. """ subfolder = app.config.get('SUBFOLDER', None) if not subfolder: return url p = urlparse.urlparse(url) if not p.path.startswith(subfolder): return url new_path = p.path.replace('%s' % (subfolder), '', 1) new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params, p.query, p.fragment) return new_url.geturl()
def repo_sha_from_github(repo, branch=u'master'): """ Get sha from head of given repo :param repo: Path to repo (owner/repo_name) :param branch: Name of branch to get sha for :returns: Sha of branch """ url = 'repos/%s/git/refs/heads/%s' % (repo, branch) app.logger.debug('GET: %s', url) resp = github.get(url) if resp.status != 200: log_error('Failed reading sha', url, resp, branch=branch) return None return resp.data['object']['sha']
def rendered_markdown_from_github(path, branch=u'master', allow_404=False): """ Get rendered markdown file text from github API :param path: Path to file (<owner>/<repo>/<dir>/.../<filename.md>) :param branch: Name of branch to read file from :param allow_404: False to log warning for 404 or True to allow it i.e. when you're just seeing if a file already exists :returns: HTML file text """ url = contents_url_from_path(path) headers = {'accept': 'application/vnd.github.html'} app.logger.debug('GET: %s, headers: %s, ref: %s', url, headers, branch) resp = github.get(url, headers=headers, data={'ref': branch}) if resp.status == 200: return unicode(resp.data, encoding='utf-8') if resp.status != 404 or not allow_404: log_error('Failed reading rendered markdown', url, resp, branch=branch) return None
def read_user_from_github(username=None): """ Read user information from github :param username: Optional username to search for, if no username given the currently logged in user will be returned (if any) :returns: Dict of information from github API call """ if username is not None: url = 'users/%s' % (username) else: url = 'user' app.logger.debug('GET: %s', url) resp = github.get(url) if resp.status != 200: log_error('Failed reading user', url, resp) return {} return resp.data
def read_branch(repo_path, name): """ Read branch and get HEAD sha :param repo_path: Path to repo of branch :param name: Name of branch to read :returns: SHA of HEAD or None if branch is not found """ url = 'repos/%s/git/refs/heads/%s' % (repo_path, name) app.logger.debug('GET: %s', url) resp = github.get(url) # Branch doesn't exist if resp.status == 404: return None if resp.status != 200: log_error('Failed reading branch', url, resp) return None return resp.data['object']['sha']
def check_rate_limit(): """ Get rate limit data :returns: None in case of an error or raw rate limit request data """ url = '/rate_limit' app.logger.debug('GET: %s', url) resp = github.get(url) if resp.status != 200: log_error('Failed checking rate limit', url, resp) return None return resp.data
def getHttpParams(request): """ Get the HTTP parameters of a request All the results will be done in upper case, just to make sure that users will be used to define those variable in uppercases """ httpParams = {} for postValues in request.args.items(): #TODO Find a way to not have this stupid hack httpParams[postValues[0].replace("amp;", "")] = postValues[1] for postValues in request.form.items(): httpParams[postValues[0]] = postValues[1] httpParams['DIRECTORY'] = os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_LOCATION) # Special environment configuration httpParams['CONFIG'] = {} httpParams['CONFIG']['WRK'] = config.WORK_PATH httpParams['CONFIG']['COMPANY'] = config.COMPANY for source in AUTHORIZED_SOURCES: httpParams[source] = session.get(source, None) return httpParams
def deleteFiles(report_name): """ Delete a file in the report environment """ import shutil if request.form.get('SCRIPT') is not None: deletedLocation = os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_DELETED_FOLDERS) if not os.path.exists(deletedLocation): os.makedirs(deletedLocation) # Move the file to the deleted Location # This folder should be purged every month shutil.move(os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_LOCATION, report_name, request.form.get('SCRIPT')), os.path.join(deletedLocation, "%s_%s" % (report_name, request.form.get('SCRIPT')))) appendToLog(report_name, 'DELETE', request.form.get('SCRIPT')) return json.dumps({'SCRIPT': request.form.get('SCRIPT'), 'ENV': report_name}) #TO BE REMOVED
def configFile(report_name): """ Write a Json configuration file """ if request.method == 'POST': requestParams = getHttpParams(request) userDirectory = os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_LOCATION, report_name, "config", requestParams['source']) if not os.path.exists(userDirectory): os.makedirs(userDirectory) # commentFile = open(os.path.join(userDirectory, "%s.cfg" % requestParams['key']), "w") commentFile.write(requestParams['val']) commentFile.close() return json.dumps('') # --------------------------------------------------------------------------------------------------------- # DOWNLOAD SECTION # # The below section will allow # - To get the full Ares updated package # - To get the full report updated package # - To get the last version of a specific script # ---------------------------------------------------------------------------------------------------------
def createTeam(): """ """ if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]: newTeam = Team.query.filter_by(team_name=request.args['team']).first() dbEnv = AresSql.SqliteDB(request.args['report_name']) if not newTeam: team = Team(request.args['team'], request.args['team_email']) db.session.add(team) db.session.commit() newTeam = Team.query.filter_by(team_name=request.args['team']).first() team_id = newTeam.team_id role = request.args.get('role', 'user') dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s'); INSERT INTO env_auth (env_id, team_id) SELECT env_def.env_id, %s FROM env_def WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name'])) return json.dumps('Success'), 200 return json.dumps('Forbidden', 403)
def auth_another(uid=None): data = KubeUtils._get_params() original_user = KubeUtils.get_current_user() uid = uid if uid else data['user_id'] try: uid = int(uid) except (TypeError, ValueError): raise APIError("User UID is expected") if original_user.id == uid: raise APIError("Logging in as admin is pointless") user = User.query.get(uid) if user is None or user.deleted: raise UserNotFound('User "{0}" does not exists'.format(uid)) session['auth_by_another'] = session.get('auth_by_another', original_user.id) user_logged_in_by_another.send((original_user.id, user.id)) login_user(user, DB=False, impersonator=original_user)
def job(): job_id = request.form.get('job_id', None) date_path = request.form.get('date_path', None) status = request.form.get('status', None) # A job is starting, we want the date_path if job_id and date_path: query('UPDATE searches SET date_path = ? WHERE id = ?', [date_path, job_id]) logging.debug('update date_path=%s where id=%s' % (date_path, job_id)) g.db.commit() # A job is in progress, we want the status if date_path and status: query('UPDATE searches SET status = ? WHERE date_path = ?', [status, date_path]) logging.debug('update status=%s where date_path=%s' % (status, date_path)) g.db.commit() return redirect(url_for('index'))
def sample(date_path): try: sample_size = int(request.form.get('sample_size', None)) except ValueError: return redirect(url_for('summary', date_path=date_path)) summary = json.load(open('data/%s/summary.json' % date_path, 'r')) num_tweets = summary['num_tweets'] tweet_index = np.arange(num_tweets) shuffle(tweet_index) tweet_index = tweet_index[0:sample_size] counter = 0 with open('data/%s/sample.csv' % date_path, 'w') as sample_file: writer = csv.writer(sample_file) writer.writerow(json2csv.get_headings()) with open('data/%s/tweets.json' % date_path,'r') as tweets_file: for line in tweets_file: tweet = json.loads(line) if counter in tweet_index: writer.writerow(json2csv.get_row(tweet)) counter += 1 return redirect(url_for('summary', date_path=date_path))
def __init__(self, global_config): self.config = global_config['https'] http_helpers.add_logger(app, logger) app.after_request(http_helpers.log_request_hide_token) app.secret_key = os.urandom(24) app.permanent_session_lifetime = timedelta(minutes=60) AuthClient.set_config(global_config['auth']) app.config['confd'] = global_config.get('confd', {}) app.config['call_logd'] = global_config.get('call_logd', {}) app.config['plugind'] = global_config.get('plugind', {}) configure_error_handlers(app) self._override_url_for() self._configure_jinja() self._configure_login() self._configure_menu() self._configure_session(global_config['session_file_dir']) self._configure_babel(global_config['enabled_plugins'])
def _configure_login(self): login_manager = LoginManager() login_manager.init_app(app) @login_manager.user_loader def load_token(token): try: response = AuthClient().token.get(token) except HTTPError: return None except requests.ConnectionError: logger.warning('Wazo authentication server connection error') return None token = response.get('token') if not token: return None return UserUI(token, response.get('auth_id'))
def gdisconnect(): credentials = login_session.get('credentials') if credentials is None: print 'Access Token is None' response = make_response(json.dumps( 'Current user not connected.'), 401) response.headers['Content-Type'] = 'application/json' return response access_token = credentials url = ('https://accounts.google.com/o/oauth2/revoke?token=%s' % access_token) h = httplib2.Http() result = h.request(url, 'GET')[0] if result['status'] != '200': response = make_response(json.dumps( 'Failed to revoke token for given user.', 400)) response.headers['Content-Type'] = 'application/json' return response # Disconnect based on provider
def home(): if not session.get('logged_in'): return render_template('login.html') else: sys_data = {"current_time": '',"machine_name": ''} mycroft_skills = "" try: sys_data['current_time'] = subprocess.check_output(['date'], shell=True) sys_data['machine_name'] = platform.node() sys_data['mycroft_version'] = subprocess.check_output(['dpkg -s mycroft-core | grep Version'], shell=True) mycroft_skills = sorted(os.listdir('/opt/mycroft/skills')) sys_data['skills_log'] = subprocess.check_output(['tail -n 10 /var/log/mycroft-skills.log'], shell=True) print(type(sys_data['skills_log'])) disk_usage_info = disk_usage_list() except Exception as ex: print(ex) finally: return render_template("index.html", title='Mark 1 - System Information', sys_data = sys_data, name=login['username'], mycroft_skills=mycroft_skills)
def disconnect(): """ User log out. """ provider = session.get('provider') if provider: if provider == 'facebook': fbdisconnect() del session['facebook_id'] if provider == 'google': gdisconnect() del session['gplus_id'] del session['name'] del session['email'] del session['picture'] del session['user_id'] del session['provider'] del session['access_token'] flash('You were successfully logged out.') return redirect(url_for('index')) # Facebook OAuth
def update_auto_fields(self, update=False): """ Update self.id with self.uid Uodate author_original Update author_last """ from flask import session cur_user = session.get('user') or {} if session else {} if not update: # Add UID to newly created objects self.id = self.uid self.author_original_id = cur_user.get('id') else: self.author_last_id = cur_user.get('id')
def logout_required(f): @wraps(f) def decorated_function(*args, **kwargs): access_token = session.get('deezer_access_token', None) expires_at = session.get('deezer_expires_at', None) now = int(time.time()) if (access_token is not None and expires_at is not None and expires_at > now): return render_template( 'show_message.html', message='You need to be logged out to see this page'), 403 return f(*args, **kwargs) return decorated_function
def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): credentials = session.get('credentials', None) if credentials is None: return redirect(url_for('youtube.login')) credentials = client.OAuth2Credentials.from_json(credentials) if credentials.access_token_expired: return redirect(url_for('youtube.login')) http_auth = credentials.authorize(httplib2.Http()) g.youtube = build('youtube', 'v3', http=http_auth) return f(*args, **kwargs) return decorated_function
def update_transaction(year, month, transaction_id): date = datetime.date(year, month, 1) categories = request.form.getlist('categories') goal = request.form.get('goal') storage.update_transaction(date, transaction_id, categories=categories, goal=goal) existing_categories = [c['name'] for c in config.get('categories') or []] has_new_categories = False for category in categories: if category not in existing_categories: if not config.get('categories'): config['categories'] = [] config['categories'].append({'name': category}) has_new_categories = True if has_new_categories: save_config(config) return ''
def update(year, month): date = datetime.date(year, month, 1) filename = None delete_file = False if bank_adapter.fetch_type == 'file': if 'file' not in request.files: abort(400) file = request.files['file'] if config.get('imports_dir'): filename = os.path.join(config['imports_dir'], '%s-%s' % (datetime.date.today().isoformat(), secure_filename(file.filename))) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) else: temp_file = NamedTemporaryFile(delete=False) filename = temp_file.name temp_file.close() delete_file = True file.save(filename) update_local_data(config, date=date, filename=filename, storage=storage) if delete_file: os.unlink(filename) return redirect(url_for('index', year=year, month=month))
def process_login(): """Log in existing users and redirect to homepage.""" email = request.form.get('email') password = request.form.get('password') # select the user from the database who has the given email (if any) user = User.query.filter(User.email==email).first() if user: # if user in database, check that password is correct if password == user.password: session['user'] = user.user_id session['waypoints'] = [] flash("You're logged in.") return redirect('/') else: flash('Your password is incorrect. Please enter your information again or register as a new user.') return redirect('/login') else: flash('Please register as a new user.') return redirect('register.html')
def process_registration(): """Add new user to database and log them in.""" email = request.form.get('email') password = request.form.get('password') # instantiate a user object with the information provided new_user = User(email=email, password=password) # add user to db session and commit to db db.session.add(new_user) db.session.commit() # add user to the session; redirect to homepage session['user'] = new_user.user_id session['waypoints'] = [] flash("You're logged in.") return redirect('/')
def index(): user_form = UserForm() if request.method == "POST": if user_form.validate_on_submit(): session["username"] = user_form.name.data else: flash(user_form.errors["name"][0] if "name" in user_form.errors else user_form.errors["password"][0]) else: if request.args.get("action") == "login_out": flash("?????????") session["username"] = None return redirect(url_for("index")) elif request.args.get("action") == "overview": session["page_type"] = "overview" return redirect(url_for("index")) elif request.args.get("action") == "zhihu_topics": session["page_type"] = "zhihu_topics" return redirect(url_for("index")) return render_template("index.html", name=session.get("username"), page_type=session.get("page_type", "overview"), form=user_form)
def login(username, password): """ Authenticates a user. """ # Read in submitted username and password validate(user_login_schema, { "username": username, "password": password }) user = safe_fail(api.user.get_user, name=username) if user is None: raise WebException("Incorrect username.") if user.get("disabled", False): raise WebException("This account has been disabled.") if not user["verified"]: raise WebException("This account has not been verified yet.") if confirm_password(password, user['password_hash']): if not user["verified"]: try: api.email.send_user_verification_email(username) raise WebException("This account is not verified. An additional email has been sent to {}.".format(user["email"])) except InternalException as e: raise WebException("You have hit the maximum number of verification emails. Please contact support.") if debug_disable_general_login: if session.get('debugaccount', False): raise WebException("Correct credentials! But the game has not started yet...") if user['uid'] is not None: session['uid'] = user['uid'] session.permanent = True else: raise WebException("Login Error") else: raise WebException("Incorrect password")
def login_required(func): @wraps(func) def wrapper(*args, **kwargs): if session.get('user'): return func(*args, **kwargs) else: return render_template("login.html") return wrapper
def index(): return render_template('index.html', user=session.get('user'))
def case_view(): db = MongoClient() caseList = db.get_case_list() del db return render_template('case_list.html', posts=caseList, user=session.get('user')) # ??excel??????
def create_case_html(): return render_template('create_case.html', user=session.get('user'))
def for_login(): if session.get('user'): return render_template('index.html', info=None, user=session.get('user')) else: return render_template('login.html')