我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用transaction.commit()。
def test_delete_entity_method_is_working_properly(self): """testing if the delete_entity() method is working properly """ from stalker import db, Entity test_entity = Entity( name='Test Entity' ) db.DBSession.add(test_entity) db.DBSession.commit() test_entity_db = Entity.query\ .filter(Entity.name == test_entity.name).first() self.assertIsNotNone(test_entity_db) from stalker_pyramid.testing import DummyRequest request = DummyRequest() request.matchdict['id'] = test_entity_db.id entity_view = entity.EntityViews(request) entity_view.delete_entity() test_entity_db = Entity.query\ .filter(Entity.name == test_entity.name).first() self.assertIsNone(test_entity_db)
def test_delete_entity_method_is_working_properly(self): """testing if DELETE /api/entities/{id} is working properly """ from stalker import db, Entity test_entity = Entity( name='Test Entity' ) db.DBSession.add(test_entity) db.DBSession.commit() test_entity_db = Entity.query\ .filter(Entity.name == test_entity.name).first() self.assertIsNotNone(test_entity_db) self.test_app.delete( '/api/entities/%s' % test_entity_db.id, status=200 ) test_entity_db = Entity.query\ .filter(Entity.name == test_entity.name).first() self.assertIsNone(test_entity_db)
def tearDown(self): """clean up the test """ import datetime import transaction from stalker import defaults from stalker.db.session import DBSession from stalker.db.declarative import Base from pyramid import testing testing.tearDown() # clean up test database connection = DBSession.connection() engine = connection.engine connection.close() Base.metadata.drop_all(engine) transaction.commit() DBSession.remove() defaults.timing_resolution = datetime.timedelta(hours=1)
def update_links(self): """updates the ticket.links attribute """ link_ids = self.get_multi_integer(self.request, 'link_id') from stalker import SimpleEntity links = SimpleEntity.query.filter(SimpleEntity.id.in_(link_ids)).all() if self.request.method == 'PATCH': for link in links: if link not in self.entity.links: self.entity.links.append(link) elif self.request.method == 'POST': self.entity.links = links import transaction transaction.commit() from pyramid.response import Response return Response('Updated links of ticket %s' % self.entity_id)
def delete_links(self): """removes items from the ticket.links attribute """ link_ids = self.get_multi_integer(self.request, 'link_id') from stalker import SimpleEntity links = SimpleEntity.query.filter(SimpleEntity.id.in_(link_ids)).all() successfully_deleted_item_ids = [] for link in links: if link in self.entity.links: self.entity.links.remove(link) successfully_deleted_item_ids.append(link.id) import transaction transaction.commit() from pyramid.response import Response return Response( 'Deleted links [%s] from ticket %s' % ( ', '.join(map(str, successfully_deleted_item_ids)), self.entity_id ) )
def update_related_tickets(self): """updates the ticket.related_tickets attribute """ related_ticket_ids = \ self.get_multi_integer(self.request, 'related_ticket_id') related_tickets = \ Ticket.query.filter(Ticket.id.in_(related_ticket_ids)).all() if self.request.method == 'PATCH': for rt in related_tickets: if rt not in self.entity.related_tickets: self.entity.related_tickets.append(rt) elif self.request.method == 'POST': self.entity.related_tickets = related_tickets import transaction transaction.commit()
def delete_good(request): """deletes the good with data from request """ logger.debug('***delete good method starts ***') good_id = request.params.get('id') good = Good.query.filter_by(id=good_id).first() if not good: transaction.abort() return Response('There is no good with id: %s' % good_id, 500) good_name = good.name try: DBSession.delete(good) transaction.commit() except Exception as e: transaction.abort() c = StdErrToHTMLConverter(e) transaction.abort() return Response(c.html(), 500) return Response('Successfully deleted good with name %s' % good_name)
def entity_creator(self, entity_class, param_resolution): """Creates SOM class instances by using param_resolution. :param entity_class: A SOM Class :param param_resolution: A list of dictionaries. See :method:``.resolve_param_resolution`` for details. """ args = self.resolve_param_resolution(param_resolution) # fix created_by value if skipped # and use the logged in user as the creator if 'created_by' not in args: logged_in_user = self.get_logged_in_user(self.request) args['created_by'] = logged_in_user from stalker.db.session import DBSession new_entity = entity_class(**args) DBSession.add(new_entity) DBSession.flush() transaction.commit() return new_entity
def main(): session = make_session() with transaction.manager: old_friends = session.query(OldFriend).all() for old in old_friends: username = old.user friend_username = old.friend user_id = session.query(User.id).filter(User.username == username).first()[0] try: friend_id = session.query(User.id).filter(User.username == friend_username).first()[0] except: print "friend was missing" continue new_friend = Friend(user_id, friend_id) session.add(new_friend) #transaction.commit()
def add_result_config(): session = make_session() parser = argparse.ArgumentParser() parser.add_argument("league", type=int, help="league id") parser.add_argument("match", type=int, help="match id") args = parser.parse_args() results = results_config with transaction.manager: for team in results: for player in team['players']: result_string = "%s,%s" % (team["position"], player["kills"]) hero_id = session.query(Hero).filter(Hero.league == args.league).filter(Hero.name == player['name']).first() if not hero_id: print "Name wrong" return session.add(Result(args.league, hero_id.id, args.match, result_string, time.time(), 1, 1)) transaction.commit() return
def add_result(): session = make_session() parser = argparse.ArgumentParser() parser.add_argument("league", type=int, help="league id") parser.add_argument("match", type=int, help="match") parser.add_argument("player", type=str, help="player name") parser.add_argument("position", type=int, help="team position") parser.add_argument("kills", type=int, help="player kills") args = parser.parse_args() with transaction.manager: result_string = "%s,%s" % (args.position, args.kills) hero_id = session.query(Hero).filter(Hero.league == args.league).filter(Hero.name == args.player).first() if not hero_id: print "Name wrong" return session.add(Result(args.league, hero_id.id, args.match, result_string, time.time(), 1, 1)) transaction.commit() return
def main(): """ Dont want to delete account as soon as indicated. makes battlecupos and stuff awkward. so delete it at the end of the day. :return: """ session = make_session() with transaction.manager: delete_accounts = session.query(User.username).filter(User.to_delete == True).all() for username in delete_accounts: username = username[0] # loop over leagues and delete from them #session.query(TeamHero).filter(TeamHero.user == username).delete() # any others? session.query() delete(delete_accounts) transaction.commit()
def main(): session = make_session() parser = argparse.ArgumentParser() parser.add_argument("league", type=int, help="league id") args = parser.parse_args() league = session.query(League).filter(League.id == args.league).first() with transaction.manager: print "Updating hero points" update_hero_points(session, league) transaction.commit() with transaction.manager: print "Updating league points" update_league_points(session, league) transaction.commit() with transaction.manager: print "Updating user rankings" update_user_rankings(session, league) transaction.commit()
def merge_artists(artists): root = getSite() new_artists = [] from lac.utilities.duplicates_utility import ( find_duplicates_artist) for artist in artists: old_artists = find_duplicates_artist(artist) published_old_artists = [a for a in old_artists if 'published' in a.state] if old_artists: old_artist = published_old_artists[0] if \ published_old_artists else old_artists[0] new_artists.append(old_artist) else: new_artists.append(artist) artist.state = PersistentList(['editable']) root.addtoproperty('artists', artist) artist.reindex() import transaction transaction.commit() return new_artists
def venue_normalize_text(root, registry): from lac.views.filter import find_entities from lac.views.widget import redirect_links from lac.content.interface import IVenue import html_diff_wrapper contents = find_entities(interfaces=[IVenue]) len_entities = str(len(contents)) for index, venue in enumerate(contents): if getattr(venue, 'description', None): venue.description = html_diff_wrapper.normalize_text( venue.description, {redirect_links}) venue.hash_venue_data() venue.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Venues text evolved.')
def artist_normalize_text(root, registry): from lac.views.filter import find_entities from lac.views.widget import redirect_links from lac.content.interface import IArtistInformationSheet import html_diff_wrapper contents = find_entities(interfaces=[IArtistInformationSheet]) len_entities = str(len(contents)) for index, artist in enumerate(contents): if getattr(artist, 'biography', None): artist.biography = html_diff_wrapper.normalize_text( artist.biography, {redirect_links}) artist.hash_artist_data() artist.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Artists text evolved.')
def event_normalize_text(root, registry): from lac.views.filter import find_entities from lac.views.widget import redirect_links from lac.content.interface import ICulturalEvent import html_diff_wrapper contents = find_entities(interfaces=[ICulturalEvent]) len_entities = str(len(contents)) for index, event in enumerate(contents): if getattr(event, 'details', None): try: event.details = html_diff_wrapper.normalize_text( event.details, {redirect_links}) event.reindex() except Exception as error: log.warning(error) if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Event text evolved.')
def reviw_normalize_text(root, registry): from lac.views.filter import find_entities from lac.views.widget import redirect_links from lac.content.interface import IBaseReview import html_diff_wrapper contents = find_entities(interfaces=[IBaseReview]) len_entities = str(len(contents)) for index, review in enumerate(contents): if getattr(review, 'article', None): review.article = html_diff_wrapper.normalize_text( review.article, {redirect_links}) review.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Review text evolved.')
def filme_schedules_duplicates(root, registry): from lac.views.filter import find_entities from lac.content.interface import IFilmSchedule contents = find_entities(interfaces=[IFilmSchedule]) len_entities = str(len(contents)) for index, schedule in enumerate(contents): if not schedule.venue: root.delfromproperty('schedules', schedule) else: root.addtoproperty('film_schedules', schedule) if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Film schedules evolved.')
def evolve_services(root, registry): from lac.views.filter import find_entities from lac.content.interface import IService from lac.content.site_folder import SiteFolder contents = find_entities(interfaces=[IService]) len_entities = str(len(contents)) for index, service in enumerate(contents): if service.definition.service_id == 'moderation': if not isinstance(getattr(service, 'perimeter', None), SiteFolder): subscription = service.subscription subscription['subscription_type'] = 'per_unit' service.subscription = PersistentDict(subscription) service.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Services evolved.')
def fix_contributors(root, registry): from lac.views.filter import find_entities from lac.content.interface import ISearchableEntity contents = find_entities(interfaces=[ISearchableEntity]) len_entities = str(len(contents)) for index, content in enumerate(contents): if hasattr(content, 'contributors'): original = getattr(content, 'original', None) contributors = content.contributors if content.author and content.author not in contributors: content.addtoproperty('contributors', content.author) contributors = content.contributors if original and original.author and \ original.author not in contributors: content.addtoproperty('contributors', original.author) if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Contributors evolved.')
def _update_zipcode_venue(contents): len_entities = str(len(contents)) for index, venue in enumerate(contents): addresses = getattr(venue, 'addresses', []) result = [] for address in addresses: zipcodes = address.get('zipcode', []) if zipcodes is not None and isinstance(zipcodes, (set, list)): zipcodes = list(zipcodes) address['zipcode'] = zipcodes[0] if zipcodes else None result.append(address) venue.addresses = PersistentList(result) venue.hash_venue_data() venue.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities)
def reindex_venues(root, registry): from lac.views.filter import find_entities from lac.content.interface import IVenue contents = find_entities(interfaces=[IVenue]) len_entities = str(len(contents)) for index, venue in enumerate(contents): venue.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Venues reindexed')
def evolve_alerts(root, registry): from lac.views.filter import find_entities from lac.content.interface import IAlert from substanced.util import get_oid contents = find_entities(interfaces=[IAlert]) len_entities = str(len(contents)) for index, alert in enumerate(contents): alert.users_to_alert = PersistentList( [str(get_oid(user, user)) for user in alert.users_to_alert]) alert.reindex() if index % 1000 == 0: log.info("**** Commit ****") transaction.commit() log.info(str(index) + "/" + len_entities) log.info('Alerts reindexed')
def ZODBDatabaseConfigurationFactory(key, dbconfig): config = dbconfig.get('configuration', {}) fs = ZODB.FileStorage.FileStorage(dbconfig['path']) db = DB(fs) try: rootobj = db.open().root() if not IDatabase.providedBy(rootobj): alsoProvides(rootobj, IDatabase) transaction.commit() rootobj = None except: pass finally: db.close() # Set request aware database for app db = RequestAwareDB(dbconfig['path'], **config) return Database(key, db)
def NewtConfigurationFactory(key, dbconfig): if not NEWT: raise Exception("You must install the newt.db package before you can use " "it as a dabase adapter.") config = dbconfig.get('configuration', {}) dsn = "dbname={dbname} user={username} host={host} password={password} port={port}".format(**dbconfig['dsn']) # noqa adapter = newt.db.storage(dsn=dsn, **dbconfig['options']) db = newt.db.DB(dsn, **dbconfig['options']) try: conn = db.open() rootobj = conn.root() if not IDatabase.providedBy(rootobj): alsoProvides(rootobj, IDatabase) transaction.commit() except: pass finally: rootobj = None conn.close() db.close() adapter = newt.db.storage(dsn, **dbconfig['options']) db = newt.db._db.NewtDB(RequestAwareDB(adapter, **config)) return Database(key, db)
def createUser(username, password, fullname, email, role): """Create a new L{User}. @param username: A C{unicode} username for the user. @param password: A C{unicode} password in plain text for the user. The password will be hashed before being stored. @param fullname: A C{unicode} name for the user. @param email: The C{unicode} email address for the user. @param role: The L{Role} for the user. @return: A C{list} of C{(objectID, username)} 2-tuples for the new L{User}s. """ username = username.lower() users = UserAPI() result = users.create([(username, password, fullname, email)]) # Set the role with an update to ensure that the 'fluiddb/users/role' tag # value is set correctly. users.set([(username, None, None, None, role)]) try: transaction.commit() except: transaction.abort() raise return result
def updateUser(username, password, fullname, email, role): """Updates a L{User}. @param username: A C{unicode} username for the user. @param password: A C{unicode} password in plain text for the user. The password will be hashed before being stored. @param fullname: A C{unicode} name for the user. @param email: The C{unicode} email address for the user. @param role: The L{Role} for the user. @return: @return: A C{(objectID, username)} 2-tuple representing the L{User} that was updated. """ try: result = UserAPI().set([(username, password, fullname, email, role)]) transaction.commit() except: transaction.abort() raise return result
def setVersionTag(version): """Updates the fluiddb/version tag. @param version: The new version string. """ user = getUser(u'fluiddb') objectID = ObjectAPI(user).create(u'fluidinfo') releaseDate = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') values = {objectID: { u'fluiddb/api-version': { 'mime-type': 'text/plain', 'contents': version}, u'fluiddb/release-date': { 'mime-type': 'text/plain', 'contents': releaseDate + '\n'}}} TagValueAPI(user).set(values) PermissionAPI(user).set([ (u'fluiddb/api-version', Operation.READ_TAG_VALUE, Policy.OPEN, []), (u'fluiddb/release-date', Operation.READ_TAG_VALUE, Policy.OPEN, [])]) try: transaction.commit() except: transaction.abort() raise
def _resetIndex(self, baseURL): """Reset the Solr index.""" url = urljoin(baseURL, '/solr/update?wt=json') headers = {'User-Agent': 'FluidDB test suite', 'Content-Type': 'text/xml'} body = '<delete><query>*:*</query></delete>' response, content = Http().request(url, 'POST', body, headers) if response.status != 200: raise RuntimeError( "Couldn't clear Solr index! Got HTTP %s return code and " 'content: %s', (response.status, content)) url = urljoin(baseURL, '/solr/update?wt=json') headers = {'User-Agent': 'FluidDB test suite', 'Content-Type': 'text/xml'} body = '<commit></commit>' response, content = Http().request(url, 'POST', body, headers) if response.status != 200: raise RuntimeError( "Couldn't commit Solr index! Got HTTP %s return code and " 'content: %s', (response.status, content))
def edit_view(self, request): client = self.client data = request.POST or None client.connection._p_activate() client_form = Client.Form(data, initial=client.__dict__) del client_form.fields['hostname'] connection_form = RshClientConnection.Form(data, initial=client.connection.__dict__) if data and client_form.is_valid() and connection_form.is_valid(): client._update(client_form.cleaned_data) client.connection._update(connection_form.cleaned_data) transaction.get().note('Edited client %s' % client.hostname) transaction.commit() return self.redirect_to() return self.render(request, 'core/client/edit.html', { 'client': client, 'client_form': client_form, 'connection_form': connection_form, })
def prune_archives(self, archives, repository): """ Prune list of two tuples (delete, archive), all of which must be in the same `Repository`. Return `Statistics`. """ # TODO Maybe commit some stuff here after a while, because this can seriously take some time. stats = Statistics() with open_repository(repository) as borg_repository: manifest, key = Manifest.load(borg_repository) with Cache(borg_repository, key, manifest, lock_wait=1) as cache: for delete, archive in archives: assert archive.repository == repository if delete: log.info('Deleting archive %s [%s]', archive.name, archive.id) archive.delete(manifest, stats, cache) else: log.info('Skipping archive %s [%s]', archive.name, archive.id) manifest.write() borg_repository.commit() cache.commit() transaction.commit() log.error(stats.summary.format(label='Deleted data:', stats=stats)) return stats
def _add_completed_archive(self): log.debug('Saving archive metadata to database') archive = BorgArchive(self.repository, self._repository_key, self._manifest, self.job.archive_name, cache=self._cache) stats = archive.calc_stats(self._cache) duration = archive.ts_end - archive.ts ao = Archive( id=archive.fpr, repository=self.job.repository, name=archive.name, client=self.job.client, job=self.job, nfiles=stats.nfiles, original_size=stats.osize, compressed_size=stats.csize, deduplicated_size=stats.usize, duration=duration, timestamp=archive.ts, timestamp_end=archive.ts_end, ) self.job.archive = ao transaction.get().note('Added completed archive %s for job %s' % (ao.id, self.job.id)) transaction.commit() log.debug('Saved archive metadata')
def setup_schema(command, conf, vars): """Place any commands to setup pyjobsweb here""" # Load the models # <websetup.websetup.schema.before.model.import> from pyjobsweb import model # <websetup.websetup.schema.after.model.import> # <websetup.websetup.schema.before.metadata.create_all> print("Creating tables") model.metadata.create_all(bind=config['tg.app_globals'].sa_engine) # <websetup.websetup.schema.after.metadata.create_all> transaction.commit() print('Initializing Migrations') import alembic.config alembic_cfg = alembic.config.Config() alembic_cfg.set_main_option("script_location", "migration") alembic_cfg.set_main_option("sqlalchemy.url", config['sqlalchemy.url']) import alembic.command alembic.command.stamp(alembic_cfg, "head")
def process_oils(session_class): session = session_class() record_ids = [r.adios_oil_id for r in session.query(ImportedRecord)] session.close() logger.info('Adding Oil objects...') for record_id in record_ids: # Note: committing our transaction for every record slows the # import job significantly. But this is necessary if we # want the option of rejecting oil records. session = session_class() transaction.begin() rec = (session.query(ImportedRecord) .filter(ImportedRecord.adios_oil_id == record_id) .one()) try: add_oil(rec) transaction.commit() except OilRejected as e: logger.warning(repr(e)) transaction.abort()
def add_oil_object(session, file_columns, row_data): file_columns = [slugify_filename(c).lower() for c in file_columns] row_dict = dict(zip(file_columns, row_data)) fix_name(row_dict) fix_pour_point(row_dict) fix_flash_point(row_dict) fix_preferred_oils(row_dict) oil = ImportedRecord(**row_dict) add_synonyms(session, oil, row_dict) add_densities(oil, row_dict) add_kinematic_viscosities(oil, row_dict) add_dynamic_viscosities(oil, row_dict) add_distillation_cuts(oil, row_dict) add_toxicity_effective_concentrations(oil, row_dict) add_toxicity_lethal_concentrations(oil, row_dict) session.add(oil) transaction.commit()
def link_crude_medium_oils(session): # our category top, categories = get_categories_by_names(session, 'Crude', ('Medium',)) oils = get_oils_by_api(session, 'Crude', api_min=22.3, api_max=31.1) count = 0 for o in oils: o.categories.extend(categories) count += 1 logger.info('{0} oils added to {1} -> {2}.' .format(count, top.name, [n.name for n in categories])) transaction.commit()
def link_all_other_oils(session): ''' Category Name: - Other Sample Oils: - Catalytic Cracked Slurry Oil - Fluid Catalytic Cracker Medium Cycle Oil Criteria: - Any oils that fell outside all the other Category Criteria ''' _top, categories = get_categories_by_names(session, 'Other', ('Other',)) oils = (session.query(Oil) .filter(Oil.categories == None) .all()) count = 0 for o in oils: o.categories.extend(categories) count += 1 logger.info('{0} oils added to {1}.' .format(count, [n.name for n in categories])) transaction.commit()
def __disable_textversions(statement_uid, author_uid): """ Disables the textversions of the given statement :param statement_uid: Statement.uid :param author_uid: User.uid :return: None """ db_textversion = DBDiscussionSession.query(TextVersion).filter(and_(TextVersion.statement_uid == statement_uid, TextVersion.author_uid == author_uid)).all() # TODO #432 for textversion in db_textversion: logger('QueryHelper', '__disable_textversions', str(textversion.uid)) textversion.set_disable(True) DBDiscussionSession.add(textversion) DBDiscussionSession.flush() transaction.commit()
def __transfer_textversion_to_new_author(statement_uid, old_author_uid, new_author_uid): """ Sets a new author for the given textversion and creates a row in RevokedContentHistory :param statement_uid: Statement.uid :param old_author_uid: User.uid :param new_author_uid: User.uid :return: Boolean """ logger('QueryHelper', '__revoke_statement', 'Textversion of {} will change author from {} to {}'.format(statement_uid, old_author_uid, new_author_uid)) db_textversion = DBDiscussionSession.query(TextVersion).filter(and_(TextVersion.statement_uid == statement_uid, TextVersion.author_uid == old_author_uid)).all() # TODO #432 if not db_textversion: return False for textversion in db_textversion: textversion.author_uid = new_author_uid DBDiscussionSession.add(textversion) DBDiscussionSession.add(RevokedContentHistory(old_author_uid, new_author_uid, textversion_uid=textversion.uid)) DBDiscussionSession.flush() transaction.commit() return True
def set_reference(reference, url, nickname, statement_uid, issue_uid): """ Creates a new reference :param reference: Text of the reference :param nickname: nickname of the user :param statement_uid: statement uid of the linked statement :param issue_uid: current issue uid :return: Boolean """ db_user = DBDiscussionSession.query(User).filter_by(nickname=str(nickname)).first() if not db_user: return False parsed_url = urlparse(url) host = parsed_url.scheme + '://' + parsed_url.netloc path = parsed_url.path author_uid = db_user.uid DBDiscussionSession.add(StatementReferences(reference, host, path, author_uid, statement_uid, issue_uid)) DBDiscussionSession.flush() transaction.commit() return True
def save_issue_uid(issue_uid, nickname): """ Saves the Issue.uid for an user :param issue_uid: Issue.uid :param nickname: User.nickname :return: Boolean """ db_user = DBDiscussionSession.query(User).filter_by(nickname=nickname).first() if not db_user: return False db_settings = DBDiscussionSession.query(Settings).get(db_user.uid) if not db_settings: return False db_settings.set_last_topic_uid(issue_uid) transaction.commit() return True
def add_click_for_statement(statement_uid, nickname, supportive): """ Adds a clicks for the given statements :param statement_uid: Statement.uid :param nickname: User.nickname :param supportive: boolean :return: Boolean """ logger('VotingHelper', 'add_click_for_statement', 'increasing {} vote for statement {}'.format('up' if supportive else 'down', str(statement_uid))) if not is_integer(statement_uid): return False db_statement = DBDiscussionSession.query(Statement).get(statement_uid) db_user = DBDiscussionSession.query(User).filter_by(nickname=str(nickname)).first() if not db_user or not db_statement: return False __click_statement(db_statement, db_user, supportive) __statement_seen_by_user(db_user, statement_uid) transaction.commit() return True
def add_seen_statement(statement_uid, db_user): """ Adds the uid of the statement into the seen_by list, mapped with the given user uid :param db_user:current user :param statement_uid: uid of the statement :return: undefined """ if not is_integer(statement_uid) or not isinstance(db_user, User): return False logger('VotingHelper', 'add_seen_statement', 'statement ' + str(statement_uid) + ', for user ' + str(db_user.uid)) val = __statement_seen_by_user(db_user, statement_uid) if val: transaction.commit() return val
def test_update_projects(self): """testing update_projects() is working properly """ # create a new Project (with Python) from stalker import db, Project new_project = Project( name='New Project', code='NP', repositories=[self.test_repo] ) db.DBSession.add(new_project) db.DBSession.commit() from stalker_pyramid.testing import DummyRequest, DummyMultiDict request = DummyRequest() request.matchdict['id'] = self.test_user1.id # patch get_logged_in_user self.patch_logged_in_user(request) # and assign it to the new user (with RESTFull API) request.method = 'POST' request.params = DummyMultiDict() request.params['project_id[]'] = [self.test_project1.id, new_project.id] request.POST = request.params user_view = user.UserViews(request) response = user_view.update_projects() # check the user projects from stalker import User test_user1_db = User.query.get(self.test_user1.id) self.assertEqual( sorted(test_user1_db.projects), sorted([self.test_project1, new_project]) )
def test_get_vacations_view_is_working_properly(self): """testing if GET: /api/users/{id}/vacations view is working properly """ from stalker import db, Vacation import datetime vac1 = Vacation( user=self.test_user1, start=datetime.datetime(2016, 4, 24, 0, 0), end=datetime.datetime(2016, 4, 28, 0, 0) ) vac2 = Vacation( user=self.test_user1, start=datetime.datetime(2016, 7, 1, 0, 0), end=datetime.datetime(2016, 7, 8, 0, 0) ) db.DBSession.add_all([vac1, vac2]) db.DBSession.flush() import transaction transaction.commit() from stalker import User user1 = User.query.filter(User.login == self.test_user1.login).first() response = self.test_app.get( '/api/users/%s/vacations' % self.test_user1.id ) self.assertEqual( sorted(response.json_body), sorted([ { 'id': v.id, '$ref': '/api/vacations/%s' % v.id, 'name': v.name, 'entity_type': v.entity_type } for v in [user1.vacations[0], user1.vacations[1]] ]) ) # TASKS