我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用arrow.now()。
def sell_positions(self): q = Query() test_func = lambda closed: not closed docs = self.position_db.search(q.closed.test(test_func)) # Sell and remove position if >1hr old for doc in docs: if arrow.get(doc["at"]) < (arrow.now() - datetime.timedelta(hours=1)): self.logger.log("Trader/Seller", "informative", "Selling position for contract " + doc["contract_id"] + "!") if self.web_interface.have_position_in_market(doc["contract_id"]): self.web_interface.sell(doc["contract_id"], doc["side"], doc["amount"]) self.position_db.update({ "closed": True }, eids=[doc.eid]) # Make a trade based on the result
def poll_for_articles(self): while True: at = 0 for source in self.sources: self.logger.log("Data Input", "informative", "Polling: " + source["news_api_name"]) articles = source["news_api_instance"].get_articles() if articles is not None: for article in articles: # Skip duplicates q = Query() if len(self.sources[at]["articles_db"].search(q.title == article["title"])) == 0: self.queue_article(article) self.sources[at]["articles_db"].insert({"title": article["title"], "at": str(arrow.now())}) at = at + 1 # Sleep for interval time time.sleep(self.config["data_input"]["poll_interval"]) # Adds the given article to the queue
def _get_new_article(pages): """ Get random new tale or scp article. Return random article yonger than 30 days, with rating of at least 40 points for a skip and 20 points for a tale. """ date = arrow.now().replace(days=-30).format('YYYY-MM-DD') pages = [p for p in pages if p.created > date] skips = [p for p in pages if 'scp' in p.tags and p.rating >= 40] tales = [p for p in pages if 'tale' in p.tags and p.rating >= 20] goi = [p for p in pages if 'goi-format' in p.tags and p.rating >= 20] pages = skips + tales + goi return random.choice(pages) if pages else None
def _get_post_data(api): tweets = api.user_timeline(count=100) tweets = [i for i in tweets if i.source == core.config.twitter.name] urls = [i.entities['urls'] for i in tweets] urls = [i[0]['expanded_url'] for i in urls if i] posted = [p for p in core.pages if p.url in urls] not_posted = [p for p in core.pages if p not in posted] new = _get_new_article(not_posted) if new: # post new articles if there are any return (lex.post_on_twitter.new, new) if tweets and tweets[0].created_at == arrow.now().naive: # if we posted an old article today already, don't post anything return None if any('scp' in p.tags for p in posted[:2]): # post tale, tale, tale, scp, tale, tale, tale, scp, tale... old = _get_old_article(not_posted, scp=False) else: old = _get_old_article(not_posted, scp=True) return (lex.post_on_twitter.old, old)
def cooldown(time): def decorator(func): func._cooldown = {} @functools.wraps(func) def inner(inp, *args, **kwargs): now = arrow.now() if inp.channel not in func._cooldown: pass elif (now - func._cooldown[inp.channel]).seconds < time: inp.multiline = False return lex.cooldown func._cooldown[inp.channel] = now return func(inp, *args, **kwargs) return inner return decorator
def test_datetime_parser(self): now = arrow.now() ts_tuples = [ ("10 minutes ago", lambda x: x.replace(minutes=-10, microsecond=0, tzinfo='local')), ("1 day ago", lambda x: x.replace(days=-1, microsecond=0, tzinfo='local')), ("yesterday midnight", lambda x: x.replace(days=-1, hour=0, minute=0, second=0, microsecond=0, tzinfo='local')), ("1986-04-24 00:51:24+02:00", lambda x: arrow.get("1986-04-24 00:51:24+02:00")), ("2001-01-01 01:01:01", lambda x: arrow.get("2001-01-01 01:01:01").replace(tzinfo="local")), (now, lambda x: now)] for (s, ts) in ts_tuples: self.assertEquals(datetime_parser(s), ts(arrow.now())) with self.assertRaises(ValueError): datetime_parser("fdjkldfhskl")
def get_tradeday(self, now): """ ????????? :param now: :return: bool(??????), ????? >>> now = datetime.datetime(2016,10, 25, 0, 0, 0) # ?????????? >>> futureTradeCalendar.get_tradeday(now) (True, Timestamp('2016-10-25 00:00:00')) """ t = now.time() day = self.calendar.ix[now.date()] if DAY_LINE < t < NIGHT_LINE: # ??, ????? return day.day_trade, day.tradeday elif NIGHT_LINE < t: # ????????? return day.night_trade, day.next_td else: # ????????????????? return day.midnight_trade, day.tradeday
def process_item(self, item, domain): now = arrow.now() seen = self.check_seen_before(item) if len(seen) > 0: last_seen = max(seen) time_limit = now.replace(**self.time_scale).timestamp if last_seen < time_limit: self.insert_item_price(item, now.timestamp) raise DropItem("Already seen %s, %s" % (item['url'], arrow.get(last_seen).humanize())) else: self.insert_item_price(item, now.timestamp) self.insert_item_main(item) self.insert_item_tag_list(item) self.insert_item_description(item) self.conn.commit() return item
def __init__(self, header, backupUnfollows, bucketUnfollow): # initialise the logger variables self.path = 'cache/log/' self.log_temp = '' self.new_line = True self.backupUnfollows = backupUnfollows self.bucketUnfollow = bucketUnfollow self.today = arrow.now().format('DD_MM_YYYY') if not path.isdir(self.path): makedirs(self.path) self.init_log_name() print header
def get_signal(pair: str, signal: SignalType) -> bool: """ Calculates current signal based several technical analysis indicators :param pair: pair in format BTC_ANT or BTC-ANT :return: True if pair is good for buying, False otherwise """ dataframe = analyze_ticker(pair) if dataframe.empty: return False latest = dataframe.iloc[-1] # Check if dataframe is out of date signal_date = arrow.get(latest['date']) if signal_date < arrow.now() - timedelta(minutes=10): return False result = latest[signal.value] == 1 logger.debug('%s_trigger: %s (pair=%s, signal=%s)', signal.value, latest['date'], pair, result) return result
def get_stock_data(stock_ticker, num_days_back, minimum_days): print("GETTING STOCK DATA") end_date = arrow.now().format("YYYY-MM-DD") start_date = arrow.now() start_date = start_date.replace(days=(num_days_back*-1)).format("YYYY-MM-DD") quandl_api_key = "DqEaArDZQP8SfgHTd_Ko" quandl.ApiConfig.api_key = quandl_api_key source = "WIKI/" + stock_ticker print(" Retrieving data from quandl API...") data = quandl.get(source, start_date=str(start_date), end_date=str(end_date)) data = data[["Open", "High", "Low", "Volume", "Close"]].as_matrix() if len(data) < minimum_days: raise quandl.errors.quandl_error.NotFoundError return data
def __init__(self, begin=None, end=None): if begin: self.begin = arrow.Arrow.strptime(begin, '%Y-%m-%d', settings.TIME_ZONE) if begin else arrow.now() self.begin = self.begin.floor('day').to('UTC').datetime elif end: to = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime self.begin = to - timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) else: self.begin = arrow.now() self.begin = self.begin.floor('day').to('UTC').datetime if end: self.end = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime else: self.end = self.begin + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) self.events = Event.objects.get_by_dates(begin=self.begin, end=self.end)
def test_get_for_closing(self): now = arrow.now().floor('day').to('UTC').datetime event_before = Event() event_before.begin = now - timezone.timedelta(days=3) event_before.end = now - timezone.timedelta(days=4) event_before.title = 'test_title_now' event_before.status = 'open' event_before.save() event_after = copy.copy(event_before) event_after.id = None event_after.begin = now + timezone.timedelta(days=3) event_after.end = now + timezone.timedelta(days=4) event_after.save() queryset = Event.objects.get_for_closing() self.assertTrue(event_before in queryset) self.assertTrue(event_after not in queryset) self.assertEqual(queryset.count(), 2) for event in queryset: self.assertTrue(event.end < now) self.assertTrue(event.paid >= event.total)
def test_calendar(self): calendar = Calendar() now = arrow.now().floor('day').to('UTC').datetime week = now + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) self.assertEqual(now, calendar.begin) self.assertEqual(week, calendar.end) event = Event() event.begin = now + timezone.timedelta(days=3) event.end = now + timezone.timedelta(days=4) event.title = 'test_title_now' event.status = 'open' event.save() days = calendar.get_days() self.assertEqual(settings.EVENTS_CALENDAR_PERIOD, len(days)) for element in days: if event.begin <= element.date < event.end: self.assertIn(event, element.events) for hour in element.hours: if event.begin <= hour.date < event.end: self.assertIn(event, hour.events)
def get_for_notification(self, begin=None, end=None, hours=24): """ :param begin: from datetime :type begin: datetime.datetime :param end: to datetime :type hours: timedelta hours :param hours: int :type end: datetime.datetime :return: Events :rtype: queryset """ begin = begin if begin else timezone.datetime.now() end = end if end else begin + timezone.timedelta(hours=hours) queryset = self.get_queryset() return queryset.filter( notified_at__isnull=True, status='open', begin__gte=begin, begin__lte=end)
def yt_playlist_adder(sid, cmd, req, playlist_obj): music = cmd.music counter = 0 for item in playlist_obj: hour, minute, second = item.duration.split(':') total_time = (int(hour) * 3600) + (int(minute) * 60) + int(second) if total_time <= 600: counter += 1 data = { 'url': 'https://www.youtube.com/watch?v=' + item.videoid, 'type': 0, 'requester': req, 'sound': item, 'timestamp': arrow.now().timestamp } await music.add_to_queue(sid, data) if counter >= 200: break return counter
def get_options(self, underlying_asset=None, expiration_date=None): oc = OptionChain('NASDAQ:' + asset_factory(underlying_asset).symbol) underlying_quote = self.get_quote(underlying_asset) out = [] for option in (oc.calls + oc.puts): if arrow.get(expiration_date).format('YYMMDD') in option['s']: quote = OptionQuote(quote_date=arrow.now().format('YYYY-MM-DD'), asset=option['s'], bid=float(option['b']) if option['b'] != '-' else None, ask=float(option['a']) if option['a'] != '-' else None, underlying_price = underlying_quote.price) self._set_cache(quote) out.append(quote) return out # the code below is from https://github.com/makmac213/python-google-option-chain
def save_file(file): if file and allowed_file(file.filename): filename = secure_filename(file.filename) upload_to = join(app.config['UPLOAD_FOLDER'], filename) if exists(upload_to): filename = '{}_{}.xmind'.format(filename[:-6], arrow.now().strftime('%Y%m%d_%H%M%S')) upload_to = join(app.config['UPLOAD_FOLDER'], filename) file.save(upload_to) insert_record(filename) g.is_success = True elif file.filename == '': g.is_success = False g.error = "Please select a file!" else: g.is_success = False g.invalid_files.append(file.filename)
def fetch_exchange_by_bidding_zone(bidding_zone1='DK1', bidding_zone2='NO2', session=None): bidding_zone_a, bidding_zone_b = sorted([bidding_zone1, bidding_zone2]) r = session or requests.session() timestamp = arrow.now().timestamp * 1000 url = 'http://driftsdata.statnett.no/restapi/PhysicalFlowMap/GetFlow?Ticks=%d' % timestamp response = r.get(url) obj = response.json() exchange = filter( lambda x: set([x['OutAreaElspotId'], x['InAreaElspotId']]) == set([bidding_zone_a, bidding_zone_b]), obj)[0] return { 'sortedBiddingZones': '->'.join([bidding_zone_a, bidding_zone_b]), 'netFlow': exchange['Value'] if bidding_zone_a == exchange['OutAreaElspotId'] else -1 * exchange['Value'], 'datetime': arrow.get(obj[0]['MeasureDate'] / 1000).datetime, 'source': 'driftsdata.stattnet.no', }
def fetch_generation_forecast(country_code, session=None, now=None): if not session: session = requests.session() domain = ENTSOE_DOMAIN_MAPPINGS[country_code] # Grab consumption parsed = parse_generation_forecast(query_generation_forecast(domain, session, now)) if parsed: data = [] values, datetimes = parsed for i in range(len(values)): data.append({ 'countryCode': country_code, 'datetime': datetimes[i].datetime, 'value': values[i], 'source': 'entsoe.eu' }) return data
def fetch_consumption_forecast(country_code, session=None, now=None): if not session: session = requests.session() domain = ENTSOE_DOMAIN_MAPPINGS[country_code] # Grab consumption parsed = parse_consumption_forecast(query_consumption_forecast(domain, session, now)) if parsed: data = [] values, datetimes = parsed for i in range(len(values)): data.append({ 'countryCode': country_code, 'datetime': datetimes[i].datetime, 'value': values[i], 'source': 'entsoe.eu' }) return data
def validate_production(obj, country_code): if not 'datetime' in obj: raise Exception('datetime was not returned for %s' % country_code) if not 'countryCode' in obj: raise Exception('countryCode was not returned for %s' % country_code) if not type(obj['datetime']) == datetime.datetime: raise Exception('datetime %s is not valid for %s' % (obj['datetime'], country_code)) if obj.get('countryCode', None) != country_code: raise Exception("Country codes %s and %s don't match" % (obj.get('countryCode', None), country_code)) if arrow.get(obj['datetime']) > arrow.now(): raise Exception("Data from %s can't be in the future" % country_code) if obj.get('production', {}).get('unknown', None) is None and \ obj.get('production', {}).get('coal', None) is None and \ obj.get('production', {}).get('oil', None) is None and \ country_code not in ['CH', 'NO', 'AUS-TAS']: raise Exception("Coal or oil or unknown production value is required for %s" % (country_code)) for k, v in obj['production'].iteritems(): if v is None: continue if v < 0: raise ValueError('%s: key %s has negative value %s' % (country_code, k, v))
def get_datetime(session=None): """ Generation data is updated hourly. Makes request then finds most recent hour available. Returns an arrow datetime object using UTC-3 for timezone and zero for minutes and seconds. """ #Argentina does not currently observe daylight savings time. This may change from year to year! #https://en.wikipedia.org/wiki/Time_in_Argentina s = session or requests.Session() rt = s.get(url) timesoup = BeautifulSoup(rt.content, 'html.parser') find_hour = timesoup.find("option", selected = "selected", value = "1" ).getText() at = arrow.now('UTC-3').floor('hour') datetime = (at.replace(hour = int(find_hour), minute = 0, second = 0)).datetime return {'datetime': datetime}
def fetch_generation_forecast(country_code = 'BO', session=None): #Define actual and last day (for midnight data) formatted_date = arrow.now(tz=tz_bo).format('YYYY-MM-DD') #Define output frame data = [dict() for h in range(24)] #initial path for url to request url_init = 'http://www.cndc.bo/media/archivos/graf/gene_hora/despacho_diario.php?fechag=' url = url_init + formatted_date #Request and rearange in DF r = session or requests.session() response = r.get(url) obj = webparser(response) for h in range(1,25): data_temp = fetch_hourly_generation_forecast('BO', obj, h, formatted_date) data[h-1] = data_temp return data
def merge_production(thermal, total): """ Takes thermal generation and total generation and merges them using 'datetime' key. Returns a defaultdict. """ d = defaultdict(dict) for each in (thermal, total): for elem in each: d[elem['datetime']].update(elem) final = sorted(d.values(), key=itemgetter("datetime")) def get_datetime(hour): at = arrow.now('America/Dominica').floor('day') dt = (at.shift(hours=int(hour) - 1)).datetime return dt for item in final: i = item['datetime'] j = get_datetime(i) item['datetime'] = j return final
def add_disks(self, *dev_disks): """ Add disk by dev name .. warning:: Adding a disk during a backup is not recommended, as the current disks list could be inaccurate. It will pull the informations about the current disks attached to the domain, but the backup process creates temporary external snapshots, changing the current disks attached. This should not be an issue when the backingStore property will be correctly handled, but for now it is. :param dev_disk: dev name of the new disk to backup. If not indicated, will add all disks. """ dom_all_disks = self._get_self_domain_disks() if not dev_disks: self.disks = dom_all_disks for dev in dev_disks: if dev in self.disks: continue self.disks[dev] = dom_all_disks[dev]
def _parse_date(date_els): if len(date_els) == 2: # assumed to be year-month or month-year a, b = date_els if _is_year(a): date_vals = a, b, 1 # 1st of month assumed elif _is_year(b): date_vals = b, a, 1 # 1st of month assumed else: date_vals = arrow.now().year, a, b # assumed M/D of this year elif len(date_els) == 3: # assumed to be year-month-day or month-day-year a, b, c = date_els if _is_year(a): date_vals = a, b, c elif _is_year(c): date_vals = c, a, b else: raise ValueError("Date '{}' can't be understood".format(date_els)) else: raise ValueError("Date '{}' can't be understood".format(date_els)) return map(int, date_vals)
def protected_save(): """ after approved , save text to entries table """ today = arrow.now().format('YYYY-MM-DD') entry = flask.session["entry"] user_id = flask.session["user_real_id"] file_name = flask.session["file_name"] print(entry) print(user_id) db_entry = Entries.query.filter_by(user_id=user_id, time=today).first() if db_entry is None: entry = Entries(time=today, text=entry, user_id=user_id, file_name=file_name) db.session.add(entry) else: # only the first photo attachment will be saved db_entry.text = db_entry.text + "\n" + entry db.session.commit() return "Save Success"
def __get_event_counts(self): activity_log_list = self.todoist_api.activity.get() added_task_count = 0 completed_task_count = 0 updated_task_count = 0 today = arrow.now().to('Asia/Seoul') start, end = today.span('day') for log in activity_log_list: event_date = arrow.get( log['event_date'], 'DD MMM YYYY HH:mm:ss Z').to('Asia/Seoul') if event_date < start or event_date > end: continue event_type = log['event_type'] if event_type == 'added': added_task_count += 1 elif event_type == 'completed': completed_task_count += 1 elif event_type == 'updated': updated_task_count += 1 return added_task_count, completed_task_count, updated_task_count
def is_between(start_time: tuple, end_time: tuple, now=None) -> bool: if start_time is None and end_time is None: return True if now is None: now = datetime.datetime.now() start_h, start_m = start_time end_h, end_m = end_time if end_h == 24 and end_m == 0: end_h = 23 end_m = 59 start = now.replace( hour=start_h, minute=start_m, second=0, microsecond=0) end = now.replace(hour=end_h, minute=end_m, second=0, microsecond=0) if (start <= now <= end): return True else: return False
def is_today_day_of_week(day_of_week: list) -> bool: day_of_week = list(map(lambda x: int(x), day_of_week)) if day_of_week == [0]: return True now = arrow.now() today_day_of_week = now.weekday() + 1 if today_day_of_week in day_of_week: return True elif len(day_of_week) == 1: value = day_of_week[0] if value == 8 and ArrowUtil.is_weekday(): return True elif value == 9 and not ArrowUtil.is_weekday(): return True else: return False else: return False
def timezones(): ct = datetime.datetime.now(pytz.utc) timezones_by_country = [ (pytz.country_names[cc], [ (int(ct.astimezone(pytz.timezone(tzname)).strftime("%z")), tzname) for tzname in timezones ]) for cc, timezones in pytz.country_timezones.iteritems()] timezones_by_country.sort() ret = [] for country, timezones in timezones_by_country: ret.append(('- %s -' % (country,), None)) ret.extend( ("[UTC%+05d] %s" % (offset, tzname.replace('_', ' ')), tzname) for offset, tzname in timezones) return ret
def test_recently_popular(self): owner = db_utils.create_user() now = arrow.now() sub1 = db_utils.create_submission(owner, rating=ratings.GENERAL.code, unixtime=now - datetime.timedelta(days=6)) sub2 = db_utils.create_submission(owner, rating=ratings.GENERAL.code, unixtime=now - datetime.timedelta(days=4)) sub3 = db_utils.create_submission(owner, rating=ratings.GENERAL.code, unixtime=now - datetime.timedelta(days=2)) sub4 = db_utils.create_submission(owner, rating=ratings.GENERAL.code, unixtime=now) tag = db_utils.create_tag(u'tag') for s in [sub1, sub2, sub3, sub4]: db_utils.create_submission_tag(tag, s) for i in range(100): favoriter = db_utils.create_user() db_utils.create_favorite(favoriter, sub2, 's', unixtime=now) recently_popular = submission.select_recently_popular() self.assertEqual( [item['submitid'] for item in recently_popular], [sub2, sub4, sub3, sub1])
def test_passwordInsecure_WeasylError_if_password_length_insufficient(): db_utils.create_user(email_addr=email_addr, username=user_name) password = '' form = Bag(email=email_addr, username=user_name, day=arrow.now().day, month=arrow.now().month, year=arrow.now().year, token=token, password=password, passcheck=password) # Considered insecure... for i in range(0, login._PASSWORD): with pytest.raises(WeasylError) as err: resetpassword.reset(form) assert 'passwordInsecure' == err.value.value password += 'a' form.password = password form.passcheck = password # Considered secure... password += 'a' form.password = password form.passcheck = password # Success at WeasylError/forgotpasswordRecordMissing; we didn't make one yet with pytest.raises(WeasylError) as err: resetpassword.reset(form) assert 'forgotpasswordRecordMissing' == err.value.value
def test_password_reset_fails_if_attempted_from_different_ip_address(): # Two parts: Set forgot password record; attempt reset with incorrect IP Address in forgotpassword table vs. requesting IP # Requirement: Get token set from request() user_id = db_utils.create_user(email_addr=email_addr, username=user_name) password = '01234567890123' form_for_request = Bag(email=email_addr, username=user_name, day=arrow.now().day, month=arrow.now().month, year=arrow.now().year) resetpassword.request(form_for_request) pw_reset_token = d.engine.scalar("SELECT token FROM forgotpassword WHERE userid = %(id)s", id=user_id) # Change IP detected when request was made (required for test) d.engine.execute("UPDATE forgotpassword SET address = %(addr)s WHERE token = %(token)s", addr="127.42.42.42", token=pw_reset_token) # Force update link_time (required) resetpassword.prepare(pw_reset_token) form_for_reset = Bag(email=email_addr, username=user_name, day=arrow.now().day, month=arrow.now().month, year=arrow.now().year, token=pw_reset_token, password=password, passcheck=password) with pytest.raises(WeasylError) as err: resetpassword.reset(form_for_reset) assert 'addressInvalid' == err.value.value
def test_verify_success_if_correct_information_supplied(): # Subtests: # a) Verify 'authbcrypt' table has new hash # b) Verify 'forgotpassword' row is removed. # > Requirement: Get token set from request() user_id = db_utils.create_user(email_addr=email_addr, username=user_name) password = '01234567890123' form_for_request = Bag(email=email_addr, username=user_name, day=arrow.now().day, month=arrow.now().month, year=arrow.now().year) resetpassword.request(form_for_request) pw_reset_token = d.engine.scalar("SELECT token FROM forgotpassword WHERE userid = %(id)s", id=user_id) # Force update link_time (required) resetpassword.prepare(pw_reset_token) form = Bag(email=email_addr, username=user_name, day=arrow.now().day, month=arrow.now().month, year=arrow.now().year, token=pw_reset_token, password=password, passcheck=password) resetpassword.reset(form) # 'forgotpassword' row should not exist after a successful reset row_does_not_exist = d.engine.execute("SELECT token FROM forgotpassword WHERE userid = %(id)s", id=user_id) assert row_does_not_exist.first() is None bcrypt_hash = d.engine.scalar("SELECT hashsum FROM authbcrypt WHERE userid = %(id)s", id=user_id) assert bcrypt.checkpw(password.encode('utf-8'), bcrypt_hash.encode('utf-8'))
def test_passwords_must_be_of_sufficient_length(): password = "tooShort" form = Bag(username=user_name, password=password, passcheck=password, email='foo', emailcheck='foo', day='12', month='12', year=arrow.now().year - 19) # Insecure length with pytest.raises(WeasylError) as err: login.create(form) assert 'passwordInsecure' == err.value.value # Secure length password = "thisIsAcceptable" form.passcheck = form.password = password # emailInvalid is the next failure state after passwordInsecure, so it is a 'success' for this testcase with pytest.raises(WeasylError) as err: login.create(form) assert 'emailInvalid' == err.value.value
def test_create_fails_if_pending_account_has_same_email(): """ Test checks to see if an email is tied to a pending account creation entry in logincreate. If so, login.create() will not permit another account to be made for the same address. """ d.engine.execute(d.meta.tables["logincreate"].insert(), { "token": 40 * "a", "username": "existing", "login_name": "existing", "hashpass": login.passhash(raw_password), "email": email_addr, "birthday": arrow.Arrow(2000, 1, 1), "unixtime": arrow.now(), }) form = Bag(username="test", password='0123456789', passcheck='0123456789', email=email_addr, emailcheck=email_addr, day='12', month='12', year=arrow.now().year - 19) with pytest.raises(WeasylError) as err: login.create(form) assert 'emailExists' == err.value.value
def signin_2fa_auth_get_(request): sess = define.get_weasyl_session() # Only render page if the password has been authenticated (we have a UserID stored in the session) if '2fa_pwd_auth_userid' not in sess.additional_data: return Response(define.errorpage(request.userid, errorcode.permission)) tfa_userid = sess.additional_data['2fa_pwd_auth_userid'] # Maximum secondary authentication time: 5 minutes session_life = arrow.now().timestamp - sess.additional_data['2fa_pwd_auth_timestamp'] if session_life > 300: _cleanup_2fa_session() return Response(define.errorpage( request.userid, errorcode.error_messages['TwoFactorAuthenticationAuthenticationTimeout'], [["Sign In", "/signin"], ["Return to the Home Page", "/"]])) else: ref = request.params["referer"] if "referer" in request.params else "/" return Response(define.webpage( request.userid, "etc/signin_2fa_auth.html", [define.get_display_name(tfa_userid), ref, two_factor_auth.get_number_of_recovery_codes(tfa_userid), None], title="Sign In - 2FA"))
def prune_databases(self): for source in self.sources: self.logger.log("Data Input/Pruner", "informative", "Pruning: " + source["news_api_name"]) db = source["articles_db"] q = Query() test_func = lambda at: arrow.get(at) < (arrow.now() - datetime.timedelta(days=1)) docs = db.search(q.at.test(test_func)) eids = [doc.eid for doc in docs] db.remove(eids=eids) # Entry point for process
def log(self, source, msg_type, message): time = str(arrow.now()) message = ''.join(ch for ch in message if ch.isalnum() or ch == " ") message_final = "[" + time + "] " + "[" + source + "] " + "[" + msg_type + "] " + "[" + message + "]" self.message_queue.put(message_final)
def deteriorate(settings, logs): last_entry = logs.load_last_entry() if last_entry is None: return last_utc = last_entry.utc utc_to_arrow = arrow.get(last_utc) today = arrow.now() deteriorate = today - utc_to_arrow multiple_remove = int(deteriorate.days / 7) if multiple_remove >= 1 and settings.xp * 0.8 > 199.20000000000002: previous_xp = settings.xp utcnow = arrow.utcnow().timestamp for each in range(multiple_remove): total_xp = int(settings.xp) if total_xp >= 199.20000000000002: total_lost = round(total_xp * 0.2) settings.xp = round(total_xp * 0.8) deter_entry = LogEntry() deter_entry.average = 0 deter_entry.distance = 0 deter_entry.exercise = "DETERIORATE" deter_entry.measuring = settings.measuring_type deter_entry.points = 0 deter_entry.total = total_lost deter_entry.utc = utcnow logs.append_entry(deter_entry) settings.commit() xp_lost = previous_xp - settings.xp print('Due to not logging anything for {0} days...'.format( deteriorate.days)) print('You\'ve lost {0} XP. Your XP is now {1}'.format( xp_lost, settings.xp))
def cardio_date_converter(raw_value, activity=None): try: initial_check_date = arrow.Arrow.strptime(raw_value.strip(), '%Y-%m-%d') check_date_strftime = arrow.Arrow.strptime(initial_check_date, '%Y-%m-%d') return check_date_strftime except ValueError: if raw_value.strip() == '': return arrow.Arrow.strftime(arrow.now(), '%Y-%m-%d') else: raise ConversionFailed('Format is 1999-12-31')
def cardio_when_prompter(activity): return Prompter( 'What time did you finish? (Format 20:30:15) (Enter for now)', cardio_when_converter, activity=None )
def cardio_when_converter(raw_value, activity=None): time_input = raw_value.strip() time_split = time_input.split(':') try: if len(time_split) == 3: hours_ = int(time_split[0]) minutes_ = int(time_split[1]) seconds_ = int(time_split[2]) when_seconds = hours_ * 3600 + minutes_ * 60 + seconds_ if when_seconds <= 86399: log_divmod = divmod(when_seconds, 60) when_hours = round(log_divmod[0] / 60) when_minutes = round(log_divmod[0] % 60) when_seconds = round(log_divmod[1]) when_time = ('{0:02d}, {1:02d}, {2:02d}'.format(when_hours, when_minutes, when_seconds)) return when_time else: raise ConversionFailed('There\'s only 24 hours in a day') elif time_input == '': current_time = arrow.now().time() when_hours = current_time.hour when_minutes = current_time.minute when_seconds = current_time.second when_time = ('{0:02d} {1:02d} {2:02d}'.format(when_hours, when_minutes, when_seconds)) return when_time else: raise ValueError except ValueError: raise ConversionFailed( 'Only digits and ":" can be used. (10:00:00)' )
def version(inp): """ Output version info. Shows bot's bio, version number, github link, and uptime. """ uptime = (arrow.now() - BOOTTIME) m, s = divmod(uptime.seconds, 60) h, m = divmod(m, 60) return lex.version( version=__version__, days=uptime.days, hours=h, minutes=m)
def _get_old_article(pages, scp=True): """Get random old tale or scp article.""" date = arrow.now().replace(days=-180).format('YYYY-MM-DD') pages = [p for p in pages if p.created < date] if scp: pages = [p for p in pages if 'scp' in p.tags] pages = [p for p in pages if p.rating >= 120] else: pages = [ p for p in pages if 'tale' in p.tags or 'goi-format' in p.tags] pages = [p for p in pages if p.rating >= 60] return random.choice(pages)
def autoban(inp, name, host): inp.user = 'OP Alert' if any(word in name.lower() for word in PROFANITY): kick_user(inp, name, lex.autoban.kick.name) ban_user(inp, host, 10) ban_user(inp, name, 900) return lex.autoban.name(user=name) banlist = BANS.get(inp.channel) if not banlist: return # find if the user is in the banlist bans = [ b for b in banlist if name.lower() in b.names or any(pat(host) for pat in b.hosts)] for ban in bans: try: # check if the ban has expired if arrow.get(ban.status, ['M/D/YYYY', 'YYYY-MM-DD']) < arrow.now(): continue except arrow.parser.ParserError: # if we can't parse the time, it's perma pass kick_user(inp, name, lex.autoban.kick.banlist(reason=ban.reason)) ban_user(inp, host, 900) return lex.autoban.banlist(user=name, truename=ban.names[0])