我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用flask.session.query()。
def assignResults(user, assign_id, sort_id): assign = session.query(Assignment).filter( Assignment.id == assign_id).first() if user.admin: if sort_id == 0: posts = session.query(Post).join(Post.user).filter( Post.assignment_id == assign_id).order_by( User.l_name, User.f_name) elif sort_id == 1: posts = session.query(Post).filter( Post.assignment_id == assign_id).order_by(Post.created.desc()) else: posts = session.query(Post).filter(and_( Post.assignment_id == assign_id, Post.user_id == user.id)).order_by(desc(Post.created)).all() return render_template('assignResults.html', user=user, posts=posts, assign=assign, sort_id=sort_id)
def login(): if request.method == 'GET': user = check_for_user() if user: return redirect(url_for('main')) else: return render_template('login.html') else: username = request.form['username'] password = request.form['password'] user = session.query(User).filter(User.username == username).first() if user: hashed_password = hashlib.sha512(password + user.salt).hexdigest() if user.password == hashed_password: return setCookie(user) error = 'Invalid username and/or password' return render_template('login.html', username=username, error=error)
def deleteAssign(user, assign_id): assign = session.query(Assignment).filter( Assignment.id == assign_id).first() if request.method == 'GET': if assign: return render_template('deleteAssign.html', user=user, assign=assign) else: if assign: tests = session.query(Test).filter( Test.assignment_id == assign_id).all() posts = session.query(Post).filter( Post.assignment_id == assign_id).all() for t in tests: session.delete(t) for p in posts: session.delete(p) session.delete(assign) session.commit() return redirect(url_for('main'))
def roster(user): if request.method == 'GET': users = session.query(User).filter( User.admin == False).order_by(User.l_name).all() # noqa admin = session.query(User).filter( User.admin).order_by(User.l_name).all() return render_template('roster.html', user=user, users=users, admin=admin) else: username = request.form['username'] user = session.query(User).filter(User.username == username).first() if user: session.delete(user) session.commit() return redirect(url_for('roster'))
def login(): if request.method == 'GET': return render_template('login.html') session = Session() username = request.json.get('username', '') password = request.json.get('password', '') registered_user = (session.query(User) .filter(and_(User.username==username)).first()) if (registered_user is None or not check_password_hash(registered_user.password, password)): Session.remove() return jsonify(success=False) login_user(registered_user) flash('Logged in successfully') Session.remove() user = current_user.__dict__.copy() user.pop('_sa_instance_state', None) return jsonify(success=True, isAdmin=current_user.is_admin(), **user)
def get_shakemaps(): session = Session() sms = (session.query(ShakeMap) .order_by(ShakeMap.recieve_timestamp.desc()) .all()) sm_dicts = [] for sm in sms: sm_dict = sm.__dict__.copy() sm_dict.pop('_sa_instance_state', None) sm_dicts += [sm_dict] sm_json = json.dumps(sm_dicts, cls=AlchemyEncoder) Session.remove() return sm_json
def get_shakemap(shakemap_id): session = Session() sms = (session.query(ShakeMap) .filter(ShakeMap.shakemap_id == shakemap_id) .order_by(ShakeMap.shakemap_version.desc()) .all()) sm_dicts = [] for sm in sms: sm_dict = sm.__dict__.copy() sm_dict.pop('_sa_instance_state', None) sm_dicts += [sm_dict] sm_json = json.dumps(sm_dicts, cls=AlchemyEncoder) Session.remove() return sm_json
def shakemap_overlay(shakemap_id): session = Session() shakemap = (session.query(ShakeMap) .filter(ShakeMap.shakemap_id == shakemap_id) .order_by(desc(ShakeMap.shakemap_version)) .limit(1)).first() if shakemap is not None: img = os.path.join(app.config['EARTHQUAKES'], shakemap_id, shakemap_id + '-' + str(shakemap.shakemap_version), 'ii_overlay.png') else: img = app.send_static_file('sc_logo.png') Session.remove() return send_file(img, mimetype='image/gif')
def get_notification(event_id): session = Session() event = session.query(Event).filter(Event.event_id == event_id).first() dicts = [] if event is not None: nots = event.notifications for sm in event.shakemaps: nots += sm.notifications for obj in nots: dict_ = obj.__dict__.copy() dict_.pop('_sa_instance_state', None) dict_['group_name'] = obj.group.name dicts += [dict_] json_ = json.dumps(dicts, cls=AlchemyEncoder) Session.remove() return json_
def notification_html(notification_type, name): config = json.loads(request.args.get('config', 'null')) session = Session() not_builder = NotificationBuilder() if notification_type == 'new_event': # get the two most recent events events = session.query(Event).all() events = events[-2:] html = not_builder.build_new_event_html(events=events, name=name, web=True, config=config) else: # get the most recent shakemap sms = session.query(ShakeMap).all() sm = sms[-1] html = not_builder.build_insp_html(sm, name=name, web=True, config=config) Session.remove() return html
def get_group_info(group_id): session = Session() group = (session.query(Group) .filter(Group.shakecast_id == group_id) .first()) if group is not None: group_specs = {'inspection': group.get_alert_levels(), 'new_event': group.get_min_mag(), 'heartbeat': group.has_spec('heartbeat'), 'scenario': group.get_scenario_alert_levels(), 'facilities': get_facility_info(group_name=group.name), 'users': group.users, 'template': group.template} specs_json = json.dumps(group_specs, cls=AlchemyEncoder) Session.remove() return specs_json
def modify_user_info(args): email = args.get("email") user = db.session.query(User).filter_by(email=email).first() float_items = ["height", "weight", "bust", "Waist", "BMI"] int_items = ["vip", "step_number"] if user and ("email" in session) and session["email"] == user.email: for key in args: value = args[key] if value and key != "portrait": if key in float_items: value = float(value) elif key in int_items: value = int(value) user.__setattr__(key, value) db.session.commit() return {"result": "success", "message": "modify user info success"} else: return {"result": "error", "message": "please login"}
def index(): # Grab the choice variable passed from POST # for x in request.args['choice']: # print('in args: {}'.format(x)) # hook up database functions Session = sessionmaker() # locate the db location; currently hardcoded to development database engine = create_engine('sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')) Session.configure(bind=engine) session = Session() post_count = session.query(func.count(Post.id)).scalar() # count number of unique posts in the table pic1 = Post.query.get(randint(1, post_count)) # fails if there is 1 or less entries in the database pic2 = None while pic2 == pic1 or pic2 is None: # Don't pick the same file pic2 = Post.query.get(randint(1, post_count)) pic1_filename = url_for('static', filename=pic1.filename) pic2_filename = url_for('static', filename=pic2.filename) return render_template('index.html', pic1=pic1, pic2=pic2, pic1_filename=pic1_filename, pic2_filename=pic2_filename)
def check_for_user(): cookie_value = request.cookies.get('user_id') if cookie_value: params = cookie_value.split('|') if hashlib.sha512(params[0] + hash_salt).hexdigest() == params[1]: user = session.query(User).filter( User.username == params[0]).first() if user: return user
def main(): user = check_for_user() if user: print(user.f_name) assign = session.query(Assignment).order_by(desc(Assignment.created)).all() return render_template('main.html', user=user, assign=assign)
def assignResultsReview(user, post_id): post = session.query(Post).filter(post_id == Post.id).first() assign = session.query(Assignment).filter( Assignment.id == post.assignment_id).first() if post.user_id != user.id and not user.admin: return abort(403) if request.method == 'GET': return render_template('assignReviewResults.html', assign=assign, post=post, user=user) else: session.delete(post) session.commit() return redirect(url_for('assignView', assign_id=assign.id))
def postFeedback(user, post_id): post = session.query(Post).filter(Post.id == post_id).first() # form_id = 'feedback_box_%s' % post_id notes = request.form['data'] print('notes', notes) if post: post.notes = notes session.commit() return notes else: print('no post')
def allResults(user): assign = session.query(Assignment).order_by(desc(Assignment.created)).all() return render_template('allResults.html', user=user, assign=assign)
def editAssign(user, assign_id): assign = session.query(Assignment).filter( Assignment.id == assign_id).first() params = {} if request.method == 'GET': params['title'] = assign.name params['desc'] = assign.desc return render_template('admin.html', user=user, params=params) else: title = request.form['title'] descrip = request.form['desc'] assign_type = request.form['assign_type'] include_tf = request.form.get('include_testfiles') if title and descrip: assign.name = title assign.desc = descrip assign.int_type = assign_type if include_tf: assign.include_tf = True else: assign.include_tf = False session.commit() return redirect(url_for('assignView', assign_id=assign_id)) else: params['title'] = title params['desc'] = descrip params['error'] = 'Please fill in both fields before continuing.' return render_template('admin.html', user=user, params=params)
def testView(user, test_id): test = session.query(Test).filter(Test.id == test_id).first() if request.method == 'GET': assign = session.query(Assignment).filter( Assignment.id == test.assignment_id).first() return render_template('testView.html', test=test, user=user, assign=assign)
def resetPassword(user): if request.method == 'GET': return render_template('resetPass.html', user=user) else: username = request.form['username'] password = request.form['password'] if not username or not password: status_message = 'Both fields are required.' return render_template('resetPass.html', status_message=status_message, user=user) user = session.query(User).filter(User.username == username).first() if not user: status_message = 'User could not be found. ' \ 'Please verify their username and try again.' return render_template('resetPass.html', status_message=status_message, user=user) salt = make_salt() user.salt = salt user.password = hashlib.sha512(password + salt).hexdigest() session.commit() status_message = 'Users password has been changed.' return render_template('resetPass.html', status_message=status_message, user=user)
def deleteUser(user, user_id): user = session.query(User).filter(User.id == user_id).first() posts = session.query(Post).filter(Post.user_id == user_id).all() if posts: for p in posts: session.delete(p) session.commit() if user: session.delete(user) session.commit() return redirect(url_for('roster'))
def all(user): users = session.query(User).all() assign = session.query(Assignment).all() posts = session.query(Post).all() tests = session.query(Test).all() return render_template('all.html', users=users, posts=posts, assign=assign, tests=tests, user=user)
def load_user(user_id): session = Session() user = session.query(User).filter(User.shakecast_id==int(user_id)).first() # Expunge here might help with Windows threading #session.expunge(user) Session.remove() return user
def get_shaking_events(facility_id): session = Session() fac = session.query(Facility).filter(Facility.shakecast_id == facility_id).first() eqs = [fs.shakemap.event for fs in fac.shaking_history if fs.shakemap is not None] eq_dicts = [] for eq in eqs: if eq is not None: eq_dict = eq.__dict__.copy() eq_dict['shakemaps'] = len(eq.shakemaps) eq_dict.pop('_sa_instance_state', None) eq_dicts += [eq_dict] Session.remove() return jsonify(success=True, data=eq_dicts)
def get_shaking_data(facility_id, eq_id): session = Session() shaking = (session.query(FacilityShaking) .filter(FacilityShaking .shakemap .has(ShakeMap.shakemap_id == eq_id)) .first()) shaking_dict = None if shaking: shaking_dict = shaking.__dict__.copy() shaking_dict.pop('_sa_instance_state', None) Session.remove() return jsonify(success=True, data=shaking_dict)
def get_affected_facilities(shakemap_id): session = Session() sms = (session.query(ShakeMap) .filter(ShakeMap.shakemap_id == shakemap_id) .order_by(ShakeMap.shakemap_version.desc()) .all()) fac_dicts = [] alert = {'gray': 0, 'green': 0, 'yellow': 0, 'orange': 0, 'red': 0} if sms: sm = sms[0] fac_shaking = sm.facility_shaking fac_dicts = [0] * len(sm.facility_shaking) i = 0 for s in fac_shaking: fac_dict = s.facility.__dict__.copy() s_dict = s.__dict__.copy() fac_dict.pop('_sa_instance_state', None) s_dict.pop('_sa_instance_state', None) fac_dict['shaking'] = s_dict fac_dicts[i] = fac_dict i += 1 # record number of facs at each alert level alert[fac_dict['shaking']['alert_level']] += 1 shaking_data = {'alert': alert, 'facilities': fac_dicts, 'types': {}} shaking_json = json.dumps(shaking_data, cls=AlchemyEncoder) Session.remove() return shaking_json
def event_image(event_id): session = Session() event = (session.query(Event) .filter(Event.event_id == event_id) .limit(1)).first() if event is not None: img = os.path.join(app.config['EARTHQUAKES'], event_id, 'image.png') else: img = app.send_static_file('sc_logo.png') Session.remove() return send_file(img, mimetype='image/gif')
def get_user_groups(user_id): session = Session() user = session.query(User).filter(User.shakecast_id == user_id).first() groups = [] if user is not None and user.groups: for group in user.groups: group_dict = group.__dict__.copy() group_dict.pop('_sa_instance_state', None) groups += [group_dict] groups_json = json.dumps(groups, cls=AlchemyEncoder) Session.remove() return groups_json
def get_project(ws_id, pj_id: int, session=db_session()) -> Project: """ Returns a project and raises 404, when project not found. :param ws_id: Workspace id :param pj_id: Project id :param session: db session :return: Project model """ project = session.query(Project).join(Workspace) \ .filter(Workspace.id == ws_id) \ .filter(Project.id == pj_id).first() if not project: raise NotFound("Could not find project with id {}".format(pj_id)) return project
def get_workspace(ws_id: int) -> Workspace: """ Returns the workspace model of the given workspace :param ws_id: The workspace ID :return: The corresponding workspace model """ workspace = db_session().query(Workspace).filter(Workspace.id == ws_id).first() if not workspace: raise NotFound("Could not find workspace with id {}".format(ws_id)) return workspace
def _get_stats_users(session): """return total registered users""" try: User = models.User num_users = session.query(User).count() except Exception as e: msg = "unkown error when handling request: %s" % e raise Exception(e) stats = { 'total_users': num_users, } return stats
def _get_stats_jobs(session): jobs_stats = { JobState.PENDING: 0, JobState.SCHEDULING: 0, JobState.RUNNING: 0, JobState.SUCCESS: 0, JobState.ERROR: 0, JobState.FAILED: 0, 'TOTAL': 0, } try: Job = models.Job result = session.query(Job.state, func.count(Job.serial_id)). \ group_by(Job.state) for _type, count in result: jobs_stats['TOTAL'] += count if _type not in jobs_stats: LOG.warning("unkown job type: %s" % _type) continue jobs_stats[_type] = count except Exception as e: LOG.error("handling request: %s" % e) raise(e) return jobs_stats
def _get_stats_tasks(session): tasks_stats = { TaskState.PENDING: 0, TaskState.RUNNING: 0, TaskState.SUCCESS: 0, TaskState.ERROR: 0, TaskState.FAILED: 0, 'TOTAL': 0, } try: Task = models.Task result = session.query(Task.state, func.count(Task.serial_id)). \ group_by(Task.state) for _type, count in result: tasks_stats['TOTAL'] += count if _type not in tasks_stats: LOG.warning("unkown task type: %s" % _type) continue tasks_stats[_type] = count except Exception as e: LOG.error("handling request: %s" % e) return tasks_stats
def get_stats_files(session): total_files = 0 # number of files total_size_bytes = 0 # bytes total_duration_seconds = 0 # seconds try: File = models.File result = session.query(File.size, File.duration).all() for size, duration in result: try: size = int(size) except Exception as e: # ignore some ill-records LOG.warning("%s" % e) size = 0 try: duration = float(duration) except Exception as e: # ignore some ill-records LOG.warning("%s" % e) duration = 0 total_files += 1 total_size_bytes += size total_duration_seconds += duration except Exception as e: LOG.error("handling request: %s" % e) stats = { 'total_files': total_files, 'total_size_bytes': total_size_bytes, 'total_duration_seconds': total_duration_seconds, } return stats
def dashboard(): session = request.db_session context = {} context['repositories'] = session.query(Repository).order_by(Repository.name).all() context['user'] = request.user context['public_key'] = get_public_key() context['all_users'] = session.query(User).all() return render_template('dashboard.html', **context)
def edit_repo(repo_id): session = request.db_session if not request.user.can_manage: return abort(403) if repo_id is not None: repo = session.query(Repository).get(repo_id) else: repo = None errors = [] if request.method == 'POST': secret = request.form.get('repo_secret', '') clone_url = request.form.get('repo_clone_url', '') repo_name = request.form.get('repo_name', '').strip() ref_whitelist = request.form.get('repo_ref_whitelist', '') if len(repo_name) < 3 or repo_name.count('/') != 1: errors.append('Invalid repository name. Format must be owner/repo') if not clone_url: errors.append('No clone URL specified') other = session.query(Repository).filter_by(name=repo_name).one_or_none() if (other and not repo) or (other and other.id != repo.id): errors.append('Repository {!r} already exists'.format(repo_name)) if not errors: if not repo: repo = Repository(name=repo_name, clone_url=clone_url, secret=secret, build_count=0, ref_whitelist=ref_whitelist) else: repo.name = repo_name repo.clone_url = clone_url repo.secret = secret repo.ref_whitelist = ref_whitelist session.add(repo) session.commit() return redirect(repo.url()) return render_template('edit_repo.html', user=request.user, repo=repo, errors=errors)
def download(build_id, data): if data not in (Build.Data_Artifact, Build.Data_Log): return abort(404) build = request.db_session.query(Build).get(build_id) if not build: return abort(404) if not build.check_download_permission(data, request.user): return abort(403) if not build.exists(data): return abort(404) mime = 'application/zip' if data == Build.Data_Artifact else 'text/plain' return utils.stream_file(build.path(data), mime=mime)
def delete(): repo_id = request.args.get('repo_id', '') build_id = request.args.get('build_id', '') user_id = request.args.get('user_id', '') session = request.db_session delete_target = None if build_id: delete_target = session.query(Build).get(build_id) if not request.user.can_manage: return abort(403) elif repo_id: delete_target = session.query(Repository).get(repo_id) if not request.user.can_manage: return abort(403) elif user_id: delete_target = session.query(User).get(user_id) if delete_target and delete_target.id != request.user.id and not request.user.can_manage: return abort(403) if not delete_target: return abort(404) try: session.delete(delete_target) session.commit() except Build.CanNotDelete as exc: session.rollback() utils.flash(str(exc)) referer = request.headers.get('Referer', url_for('dashboard')) return redirect(referer) utils.flash('{} deleted'.format(type(delete_target).__name__)) return redirect(url_for('dashboard'))
def validate_email(email, code): """ email must only one and is a email :param email: email :return: {0:success,1:double email,2:illegal} """ auth = Auth() auth.verification_code = code auth.email = email if not_email(email): return "illegal" elif db.session.query(User).filter_by(email=email).first(): return "double" else: auth = db.session.query(Auth).filter_by(email=email).first() if auth.verification_code == code: db.session.delete(auth) return "success" else: return "verification error" # print email # # user = session.query(User).filter_by(username='abc').first() # # session.delete(user) # if Auth.query.filter_by(email=email).first(): # db.session.delete(auth) # db.session.commit() # return "success" # else: # return "verification error"
def query_user_info(email, password): """ login in :param email: login email :param password: login password :return: {success, fail} """ login_user = db.session.query(User).filter_by(email=email).first() if login_user: if login_user.verify_password(password): return {"login_result": "success"} return {"login_result": "fail"}
def generate_verification_code(email): if User.query.filter_by(email=email).first(): return "double" code = str(random.randint(0, 99999999)).zfill(8) auth = Auth() auth.email = email auth.verification_code = code if Auth.query.filter_by(email=email).first(): db.session.query(Auth).filter(Auth.email == email).update({"verification_code":code}) else: db.session.add(auth) db.session.commit() send_verification_code_email(email, code) return "success"
def update_count(winner, loser): Session = sessionmaker() engine = create_engine('sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')) Session.configure(bind=engine) session = Session() curr_usr = session.query(Post).get(winner) loser_guy = session.query(Post).get(loser) session.query(Post).filter_by(id=winner).update({'total_votes': curr_usr.total_votes+1}) session.commit() flash("{} has {} votes and {} has {} votes".format(curr_usr.description, curr_usr.total_votes, loser_guy.description, loser_guy.total_votes)) return redirect(url_for('main.index'))
def signup(): if request.method == 'GET': params = {} return render_template('signup.html', params=params) else: print(request.form.items()) params = {} params['f_name'] = request.form['f_name'] params['l_name'] = request.form['l_name'] params['username'] = request.form['username'].strip() password = request.form['password'] verify = request.form['verify'] params['email'] = request.form['email'] if (not params['f_name'] or not params['l_name'] or not params['username']): params['message'] = 'Please enter your first name, last name, ' \ 'and a username.' return render_template('signup.html', params=params) userQuery = session.query(User).filter( User.username == params['username']).first() if userQuery: params['message'] = 'That username is already in use. ' \ 'Please choose a different one.' return render_template('signup.html', params=params) if not password: params['message'] = 'Please enter a valid password' return render_template('signup.html', params=params) if password != verify: params['message'] = 'Your passwords did not match. ' \ 'Please try again.' return render_template('signup.html', params=params) if not params['email']: params['message'] = 'Please enter a valid email address.' return render_template('signup.html', params=params) salt = make_salt() hashed_password = hashlib.sha512(password + salt).hexdigest() user = User(f_name=params['f_name'], l_name=params['l_name'], email=params['email'], username=params['username'], password=hashed_password, salt=salt, admin=False) session.add(user) session.commit() if(user.id == 1): user.admin = True session.commit() return redirect(url_for('login'))
def get_fac_data(): session = Session() filter_ = json.loads(request.args.get('filter', '{}')) query = session.query(Facility) if filter_: if filter_.get('group', None) is not None: query = query.filter(Facility.groups.any(Group.name.like(filter_['group']))) if filter_.get('latMax', None) is not None: query = query.filter(Facility.lat_min < float(filter_['latMax'])) if filter_.get('latMin', None) is not None: query = query.filter(Facility.lat_max > float(filter_['latMin'])) if filter_.get('lonMax', None) is not None: query = query.filter(Facility.lon_min < float(filter_['lonMax'])) if filter_.get('lonMin', None) is not None: query = query.filter(Facility.lon_max > float(filter_['lonMin'])) if filter_.get('facility_type', None) is not None: query = query.filter(Facility.facility_type.like(filter_['facility_type'])) if filter_.get('keywords', None) is not None: keys_raw = filter_['keywords'].lower().split(',') keys = [key.strip(' ') for key in keys_raw] key_filter = [or_(literal(key).contains(func.lower(Facility.name)), func.lower(Facility.name).contains(key), func.lower(Facility.description).contains(key)) for key in keys] query = query.filter(and_(*key_filter)) if filter_.get('count', None) is None: facs = query.limit(50).all() else: all_facs = query.all() if len(all_facs) > filter_['count'] + 50: facs = all_facs[filter_['count']:filter_['count'] + 50] else: facs = all_facs[filter_['count']:] dicts = [] for fac in facs: dict_ = fac.__dict__.copy() dict_.pop('_sa_instance_state', None) dicts += [dict_] Session.remove() return jsonify(success=True, data=dicts)
def get_users(): if request.method == 'GET': session = Session() filter_ = literal_eval(request.args.get('filter', 'None')) if filter_: if filter_.get('group', None): users = (session.query(User) .filter(User.shakecast_id > request.args.get('last_id', 0)) .filter(User.groups.any(Group.name.like(filter_['group']))) .limit(50) .all()) else: users = (session.query(User) .filter(User.shakecast_id > request.args.get('last_id', 0)) .limit(50) .all()) else: users = session.query(User).filter(User.shakecast_id > request.args.get('last_id', 0)).limit(50).all() user_dicts = [] for user in users: user_dict = user.__dict__.copy() user_dict.pop('_sa_instance_state', None) user_dict.pop('password', None) user_dicts += [user_dict] user_json = json.dumps(user_dicts, cls=AlchemyEncoder) Session.remove() else: users = request.json.get('users', 'null') for user in users: if user['password'] == '': user.pop('password') if users is not None: ui.send("{'import_user_dicts': {'func': f.import_user_dicts, \ 'args_in': {'users': %s, '_user': %s}, \ 'db_use': True, 'loop': False}}" % (str(users), current_user.shakecast_id)) user_json = json.dumps(users) return user_json
def clone(ws_id: int, url: str, name: str = None): """ Clones a repository by url into given workspace :param name: Optional name of the local repository name, otherwise the remote name is taken :param user_data: Session data to get access token for GitHub :param ws_id: Destination workspace to clone :param url: URL of the source repository :return: True if successful, otherwise NameConflict is thrown """ workspace = get_workspace(ws_id) url_decode = parse.urlparse(url) if is_github(url_decode.netloc): # Take the suffix of url as first name candidate github_project_name = name if github_project_name is None: github_project_name = _repo_name_from_url(url_decode) dbsession = db_session() pj = dbsession.query(Project).join(Workspace)\ .filter(Workspace.id == workspace.id).filter( Project.name == github_project_name).first() dbsession.commit() # Error when the project name in given workspace already exists if pj is not None: raise NameConflict('A project with name {} already exists'.format(github_project_name)) project_target_path = os.path.join(workspace.path, PROJECT_REL_PATH, github_project_name) logger.info('Cloning from github repo...') # If url in GitHub domain, access by token url_with_token = _get_repo_url(url_decode) out, err, exitcode = git_command(['clone', url_with_token, project_target_path]) if exitcode is 0: setup_git_user_email(project_target_path) # Check if the project is a valid son project check_son_validity(project_target_path) # Create project and scan it. dbsession = db_session() try: pj = Project(github_project_name, github_project_name, workspace) pj.repo_url = url sync_project_descriptor(pj) dbsession.add(pj) scan_project_dir(project_target_path, pj) dbsession.commit() # Check if the project is valid result = create_info_dict(out=out) result["id"] = pj.id return result except: dbsession.rollback() shutil.rmtree(project_target_path) raise Exception("Scan project failed") else: return create_info_dict(err=err, exitcode=exitcode) raise NotImplemented("Cloning from other is not implemented yet. Only github is supported for now.")
def edit_user(user_id): session = request.db_session cuser = None if user_id is not None: cuser = session.query(User).get(user_id) if not cuser: return abort(404) if cuser.id != request.user.id and not request.user.can_manage: return abort(403) elif not request.user.can_manage: return abort(403) errors = [] if request.method == 'POST': if not cuser and not request.user.can_manage: return abort(403) user_name = request.form.get('user_name') password = request.form.get('user_password') can_manage = request.form.get('user_can_manage') == 'on' can_view_buildlogs = request.form.get('user_can_view_buildlogs') == 'on' can_download_artifacts = request.form.get('user_can_download_artifacts') == 'on' if not cuser: # Create a new user assert request.user.can_manage other = session.query(User).filter_by(name=user_name).one_or_none() if other: errors.append('User {!r} already exists'.format(user_name)) else: cuser = User(name=user_name, passhash=utils.hash_pw(password), can_manage=can_manage, can_view_buildlogs=can_view_buildlogs, can_download_artifacts=can_download_artifacts) else: # Update user settings if password: cuser.passhash = utils.hash_pw(password) # The user can only update privileges if he has managing privileges. if request.user.can_manage: cuser.can_manage = can_manage cuser.can_view_buildlogs = can_view_buildlogs cuser.can_download_artifacts = can_download_artifacts if not errors: session.add(cuser) session.commit() return redirect(cuser.url()) return render_template('edit_user.html', user=request.user, cuser=cuser, errors=errors)