Python datetime 模块,now() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.now()。
def _end_of_trading(self, *arg, **kwargs):
# ????????,??????(????????????)
# ???????????
# self._make_slice(self)
if self.backtest_type in ['day']:
self.now = str(self.end_real_date)
self.today = str(self.end_real_date)
elif self.backtest_type in ['1min', '5min', '15min', '30min', '60min']:
self.now = str(self.end_real_date) + ' 15:00:00'
self.today = str(self.end_real_date)
elif self.backtest_type in ['index_day']:
self.now = str(self.end_real_date)
self.today = str(self.end_real_date)
elif self.backtest_type in ['index_1min', 'index_5min', 'index_15min', 'index_30min', 'index_60min']:
self.now = str(self.end_real_date) + ' 15:00:00'
self.today = str(self.end_real_date)
self.today = self.end_real_date
self.sell_all()
self._deal_from_order_queue()
self.__sync_order_LM('daily_settle') # ????
def DeleteOlderFiles(workfolder, days):
"""
Used to delete older backups in a folder, days is retention days
Sample use to delete all files in C:temp with created date older than 3 days:
DeleteOlderFiles(r'c:\temp', 3)
"""
# os, time already imported
now = time.time()
cutoff = now - (days * 86400)
filelist = os.listdir(workfolder)
for x in filelist:
if os.path.isfile( workfolder + '\\' + x):
t = os.stat( workfolder + '\\' + x )
c = t.st_ctime
# delete file if older than a week
if c < cutoff:
print('deleting ' + x)
os.remove(workfolder + '\\' + x )
def Default(self):
try:
self["label2"].setText(_("Default"))
now = datetime.now()
info1 = 'Date = ' + now.strftime("%d-%B-%Y") + "\n"
info2 = 'Time = ' + now.strftime("%H:%M:%S") + "\n"
info3 = self.Do_cmd("uptime", None, None)
tmp = info3.split(",")
info3 = 'Uptime = ' + tmp[0].lstrip() + "\n"
info4 = self.Do_cmd("cat", "/etc/image-version", " | head -n 1")
info4 = info4[9:]
info4 = 'Boxtype = ' + info4 + "\n"
info5 = 'Load = ' + self.Do_cmd("cat", "/proc/loadavg", None)
info6 = self.Do_cut(info1 + info2 + info3 + info4 + info5)
self["label1"].setText(info6)
except:
self["label1"].setText(_("an internal error has occured"))
def process_request(self, request, spider):
parsed_url = urlparse.urlparse(request.url)
if not self.test_mode or not parsed_url.path in ["/", ""]:
return None
if not Domain.is_onion_url(request.url):
return None
d = Domain.find_by_url(request.url)
if d is None:
return None
now = datetime.now()
if now > d.next_scheduled_check:
return None
else:
raise IgnoreRequest('FilterNotScheduledMiddleware: %s is not scheduled to check' % d.host)
def getStockData(theTicker, startDate):
stock = Share(theTicker)
print("Getting Data for ... " + theTicker)
now = datetime.now()
DateNow = str(now.year) + "-" + str(now.month) + "-" + str(now.day)
data = stock.get_historical(startDate, DateNow)
stockData = []
for d in data:
tmp = []
volume = int(d['Volume'])
adjclose = float(d['Adj_Close'])
high = float(d['High'])
low = float(d['Low'])
close = float(d['Close'])
date = d['Date']
open = float(d['Open'])
tmp.append(date)
tmp.append(open)
tmp.append(high)
tmp.append(low)
tmp.append(close)
tmp.append(adjclose)
tmp.append(volume)
stockData.append(tmp)
return stockData
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def observe_inventory(owner, repo_name, pulls):
for metric in ['additions', 'commits', 'deletions']:
metric_sum = None
if len(pulls) > 0:
metric_sum = sum([getattr(p, metric) for p in pulls])
else:
metric_sum = 0
logger.info(
'Observed for owner "%s", repo "%s", %d %s' % (owner, repo_name, metric_sum, metric))
CODE_INVENTORY.labels(owner, repo_name, metric).set(metric_sum)
for pull in pulls:
days_old = weekdays_between(pull.created_at, datetime.now())
logger.info(
'Observed for owner "%s", repo "%s", %.2f days old PR' % (owner, repo_name, days_old))
CODE_INVENTORY_AGE.labels(owner, repo_name).observe(days_old)
def trigger(self):
if self.type == TRAFFIC_FLOW_ONE_SHOT:
sleep(int(self.offset))
print "Started command at ", str(datetime.now())
sys.stdout.flush()
#os.system(self.cmd + " &")
try:
cmd_list = self.cmd.split(' ')
print cmd_list
print self.cmd
sys.stdout.flush()
p = subprocess.Popen(cmd_list,shell=False)
except:
print "Error running command: ", sys.exec_info()[0]
elif self.type == TRAFFIC_FLOW_EXPONENTIAL:
pass
elif self.type == TRAFFIC_FLOW_PERIODIC:
pass
def performRFClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Random Forest Binary Classification
"""
clf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
return accuracy
def performSVMClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
SVM binary Classification
"""
# c = parameters[0]
# g = parameters[1]
clf = SVC()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
return accuracy
def performAdaBoostClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Ada Boosting binary Classification
"""
# n = parameters[0]
# l = parameters[1]
clf = AdaBoostClassifier()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "AdaBoost: ", accuracy
def performRFClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Random Forest Binary Classification
"""
clf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "RF: ", accuracy
def performSVMClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
SVM binary Classification
"""
# c = parameters[0]
# g = parameters[1]
clf = SVC()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "SVM: ", accuracy
def performAdaBoostClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Ada Boosting binary Classification
"""
# n = parameters[0]
# l = parameters[1]
clf = AdaBoostClassifier()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "AdaBoost: ", accuracy
def main():
now = datetime.now()
aDay = timedelta(days=-1)
now = now + aDay
yesterdaystr = now.strftime('%Y-%m-%d')
# 10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27
getYesterdaySoccer('2017-10-28')
# if sys.argv.__len__()==1:
# sys.exit('\033[0;36;40m????:\n1???:\n1:?????? 2:??????? 3:????\n??: python TodaySoccer.pyc 1\033[0m')
#
# if __name__ == '__main__':
# getTodaySoccer(sys.argv[1])
# getTodaySoccer(3)
def now(self):
"""
Creates a Ticktock object with the current time, equivalent to datetime.now()
Returns
=======
out : ticktock
Ticktock object with the current time, equivalent to datetime.now()
See Also
========
datetime.datetime.now()
"""
dt = datetime.datetime.now()
return Ticktock(dt, 'utc')
# -----------------------------------------------
def today(self):
"""
Creates a Ticktock object with the current date and time set to 00:00:00, equivalent to date.today() with time included
Returns
=======
out : ticktock
Ticktock object with the current time, equivalent to date.today() with time included
See Also
========
datetime.date.today()
"""
dt = datetime.datetime.now()
dt = dt.replace(hour=0, minute=0, second=0, microsecond=0)
return Ticktock(dt, 'utc')
# -----------------------------------------------
# End of Ticktock class
# -----------------------------------------------
def latest_post_date(self):
"""
Returns the latest item's pubdate or updateddate. If no items
have either of these attributes this returns the current UTC date/time.
"""
latest_date = None
date_keys = ('updateddate', 'pubdate')
for item in self.items:
for date_key in date_keys:
item_date = item.get(date_key)
if item_date:
if latest_date is None or item_date > latest_date:
latest_date = item_date
# datetime.now(tz=utc) is slower, as documented in django.utils.timezone.now
return latest_date or datetime.datetime.utcnow().replace(tzinfo=utc)
def test_NaT_methods(self):
# GH 9513
raise_methods = ['astimezone', 'combine', 'ctime', 'dst',
'fromordinal', 'fromtimestamp', 'isocalendar',
'strftime', 'strptime', 'time', 'timestamp',
'timetuple', 'timetz', 'toordinal', 'tzname',
'utcfromtimestamp', 'utcnow', 'utcoffset',
'utctimetuple']
nat_methods = ['date', 'now', 'replace', 'to_datetime', 'today']
nan_methods = ['weekday', 'isoweekday']
for method in raise_methods:
if hasattr(NaT, method):
self.assertRaises(ValueError, getattr(NaT, method))
for method in nan_methods:
if hasattr(NaT, method):
self.assertTrue(np.isnan(getattr(NaT, method)()))
for method in nat_methods:
if hasattr(NaT, method):
self.assertIs(getattr(NaT, method)(), NaT)
# GH 12300
self.assertEqual(NaT.isoformat(), 'NaT')
def test_class_ops_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
def compare(x, y):
self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
int(np.round(Timestamp(y).value / 1e9)))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(tzutc()))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
def test_timestamp_compare_scalars(self):
# case where ndim == 0
lhs = np.datetime64(datetime(2013, 12, 6))
rhs = Timestamp('now')
nat = Timestamp('nat')
ops = {'gt': 'lt',
'lt': 'gt',
'ge': 'le',
'le': 'ge',
'eq': 'eq',
'ne': 'ne'}
for left, right in ops.items():
left_f = getattr(operator, left)
right_f = getattr(operator, right)
expected = left_f(lhs, rhs)
result = right_f(rhs, lhs)
self.assertEqual(result, expected)
expected = left_f(rhs, nat)
result = right_f(nat, rhs)
self.assertEqual(result, expected)
def test_isnull_datetime():
assert (not isnull(datetime.now()))
assert notnull(datetime.now())
idx = date_range('1/1/1990', periods=20)
assert (notnull(idx).all())
idx = np.asarray(idx)
idx[0] = iNaT
idx = DatetimeIndex(idx)
mask = isnull(idx)
assert (mask[0])
assert (not mask[1:].any())
# GH 9129
pidx = idx.to_period(freq='M')
mask = isnull(pidx)
assert (mask[0])
assert (not mask[1:].any())
mask = isnull(pidx[1:])
assert (not mask.any())
def checkDayAndSendMail():
todayDate = datetime.now()
start = datetime(todayDate.year, todayDate.month, todayDate.day)
end = start + timedelta(days=1)
global dateIndex
# if change date
if dateIndex < end :
dateIndex = end
# send mail notifying server still working
msg_content = {}
msg_content['Subject'] = '[Amazon Price Alert] Server working !'
msg_content['Content'] = 'Amazon Price Alert still working until %s !' % (todayDate.strftime('%Y-%m-%d %H:%M:%S'))
msg_content['Price'] = ""
msg_content['Time'] = todayDate.strftime('%Y-%m-%d %H:%M:%S')
msg_content['ServerState'] = "Working"
msg_content['code'] = 2 # 2 is server state
send_Notification(msg_content)
def log(prefix, text, line=False):
now = datetime.now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+now.strftime("%Y-%m-%d %H:%M")+"]["+prefix+"] "+text+e
else :
print "["+now.strftime("%Y-%m-%d %H:%M")+"]["+c+prefix+e+"] "+text
def fetch_price_data(stock):
utf_decoder = codecs.getreader("utf-8")
start_date = datetime.now() - timedelta(days=130)
start_date = start_date.strftime("%Y-%m-%d")
end_date = datetime.now().strftime("%Y-%m-%d")
try:
stocks_base_URL = 'https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.historicaldata%20where%20symbol%20%3D%20'
URL_end = '%20and%20startDate%20%3D%20%22' + start_date + '%22%20and%20endDate%20%3D%20%22' + end_date + '%22&format=json&diagnostics=true&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys&callback='
query = stocks_base_URL + "%22" + stock + "%22" + "%2C"
query = query[:-3] + URL_end
api_response = urllib.request.urlopen(query)
response_data = json.load(utf_decoder(api_response))['query']['results']['quote']
price_data[stock] = response_data
except:
print("ERROR fetching price data")
pdb.set_trace()
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def savealltable(self):
self.http_get()
s = req.content
s = etree.HTML(s)
x = s.xpath("//td")
messages.append("????????...")
from datetime import datetime
snow = str(datetime.now())
if os.path.exists('table') is False:
os.mkdir('table')
with open('table/'+snow+'.text', 'w') as f:
for child in x:
messages.append(child.text)
ui.showmssg.setText('\n'.join(messages))
print(child.text)
f.writelines(child.text + "\n")
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def send_updates(self):
""" Send updated to the KNX bus. """
d = datetime.now()
if self.timeaddr:
self.tunnel.group_write(self.timeaddr,
time_to_knx(d))
if self.dateaddr:
self.tunnel.group_write(self.dateaddr,
date_to_knx(d))
if self.datetimeaddr:
self.tunnel.group_write(self.datetimeaddr,
datetime_to_knx(d))
if self.daynightaddr:
from pysolar.solar import get_altitude
alt = get_altitude(self.lat, self.long, d)
if alt > 0:
self.tunnel.group_write(self.daynightaddr, 1)
else:
self.tunnel.group_write(self.daynightaddr, 0)
def latest_post_date(self):
"""
Returns the latest item's pubdate or updateddate. If no items
have either of these attributes this returns the current UTC date/time.
"""
latest_date = None
date_keys = ('updateddate', 'pubdate')
for item in self.items:
for date_key in date_keys:
item_date = item.get(date_key)
if item_date:
if latest_date is None or item_date > latest_date:
latest_date = item_date
# datetime.now(tz=utc) is slower, as documented in django.utils.timezone.now
return latest_date or datetime.datetime.utcnow().replace(tzinfo=utc)
def restart(self, timeout=None):
"""Restarts this Splunk instance.
The service is unavailable until it has successfully restarted.
If a *timeout* value is specified, ``restart`` blocks until the service
resumes or the timeout period has been exceeded. Otherwise, ``restart`` returns
immediately.
:param timeout: A timeout period, in seconds.
:type timeout: ``integer``
"""
msg = { "value": "Restart requested by " + self.username + "via the Splunk SDK for Python"}
# This message will be deleted once the server actually restarts.
self.messages.create(name="restart_required", **msg)
result = self.post("/services/server/control/restart")
if timeout is None:
return result
start = datetime.now()
diff = timedelta(seconds=timeout)
while datetime.now() - start < diff:
try:
self.login()
if not self.restart_required:
return result
except Exception, e:
sleep(1)
raise Exception, "Operation time out."
def search(self, query, **kwargs):
"""Runs a search using a search query and any optional arguments you
provide, and returns a `Job` object representing the search.
:param query: A search query.
:type query: ``string``
:param kwargs: Arguments for the search (optional):
* "output_mode" (``string``): Specifies the output format of the
results.
* "earliest_time" (``string``): Specifies the earliest time in the
time range to
search. The time string can be a UTC time (with fractional
seconds), a relative time specifier (to now), or a formatted
time string.
* "latest_time" (``string``): Specifies the latest time in the time
range to
search. The time string can be a UTC time (with fractional
seconds), a relative time specifier (to now), or a formatted
time string.
* "rf" (``string``): Specifies one or more fields to add to the
search.
:type kwargs: ``dict``
:rtype: class:`Job`
:returns: An object representing the created job.
"""
return self.jobs.create(query, **kwargs)
def touch(self):
"""Extends the expiration time of the search to the current time (now) plus
the time-to-live (ttl) value.
:return: The :class:`Job`.
"""
self.post("control", action="touch")
return self
def __init__(self):
self.backtest_type = 'day'
self.account = QA_Account()
self.market = QA_Market()
self.order = QA_QAMarket_bid_list()
self.bid = QA_QAMarket_bid()
self.setting = QA_Setting()
self.clients = self.setting.client
self.user = self.setting.QA_setting_user_name
self.market_data = []
self.now = None
self.last_time = None
self.strategy_start_date = ''
self.strategy_start_time = ''
self.strategy_end_date = ''
self.strategy_end_time = ''
self.today = None
self.strategy_stock_list = []
self.trade_list = []
self.start_real_id = 0
self.end_real_id = 0
self.temp = {}
self.commission_fee_coeff = 0.0015
self.account_d_value = []
self.account_d_key = []
self.benchmark_type = 'index'
self.market_data_dict = {}
self.backtest_print_log = True # ??
self.if_save_to_mongo = True
self.if_save_to_csv = True
self.stratey_version = 'V1'
self.topic_name = 'EXAMPLE'
self.outside_data = []
self.outside_data_dict = []
self.outside_data_hashable = {}
self.absoult_path = sys.path[0]
self.dirs = '{}{}QUANTAXIS_RESULT{}{}{}{}{}'.format(
self.absoult_path, os.sep, os.sep, self.topic_name, os.sep, self.stratey_version, os.sep)
def __save_strategy_files(self):
file_name = '{}backtest_{}.py'.format(
self.dirs, self.account.account_cookie)
with open(sys.argv[0], 'rb') as p:
data = p.read()
collection = self.setting.client.quantaxis.strategy
collection.insert({'cookie': self.account.account_cookie, 'name': self.strategy_name,
'topic': self.topic_name, 'version': self.stratey_version, 'user': self.user, 'datetime': datetime.datetime.now(),
'content': data.decode('utf-8'),
'dirs': self.dirs,
'absoultpath': self.absoult_path})
with open(file_name, 'wb') as f:
f.write(data)
def _make_slice(self):
QA_Setting.client.quantaxis.slice.insert({
'cookie': self.account.account_cookie,
'account_message': self.__messages,
'account_d_value': self.account_d_value,
'account_d_key': self.account_d_key,
'now': self.now,
'today': self.today,
'running_date': self.running_date,
'strategy_stock_list': self.strategy_stock_list,
'dirs': self.dirs,
})
def _deal_from_order_queue(self):
# ??bar?????,????
__result = []
self.order.__init__()
if len(self.account.order_queue) >= 1:
__bid_list = self.order.from_dataframe(self.account.order_queue.query(
'status!=200').query('status!=500').query('status!=400'))
for item in __bid_list:
# ?????????????
item.date = self.today
item.datetime = self.now
__bid, __market = self.__wrap_bid(self, item)
__message = self.__send_bid(
__bid, __market)
if isinstance(__message, dict):
if __message['header']['status'] in ['200', 200]:
self.__sync_order_LM(
'trade', __bid, __message['header']['order_id'], __message['header']['trade_id'], __message)
else:
self.__sync_order_LM('wait')
else:
self.__QA_backtest_log_info(
'FROM BACKTEST: Order Queue is empty at %s!' % self.now)
pass
def before_backtest(self):
global start_time
start_time = datetime.now()
global risk_position
print(self.market_data_hashable)
input()
def end_backtest(self):
global start_time
end_time = datetime.now()
cost_time = (end_time - start_time).total_seconds()
QA.QA_util_log_info('???? {} {}'.format(cost_time, 'seconds'))
self.if_save_to_csv = True
self.if_save_to_mongo = True
def SqlExecute(conn, sqlquery=''):
"""
Executes sqlquery and returns lists with column names and data
The connection info is passed as a dictionary with these required keys:
servername, username,password
If username is empty will use integrated security
These keys are optional: defdb, colseparator
"""
if 'colseparator' not in conn.keys():
conn['colseparator'] = chr(1)
if conn['username'] == '':
constr = "sqlcmd -E -S" + conn['servername'] + " /w 8192 -W " + ' -s' + conn['colseparator'] + ' '
else:
constr = "sqlcmd -U" + conn['username'] + " -P" + conn['password'] + ' -S' + conn['servername'] + ' /w 8192 -W -s' + conn['colseparator'] + ' '
# now we execute
try:
data = subprocess.Popen(constr + '-Q"' + sqlquery + '"', stdout=subprocess.PIPE).communicate()
except Exception as inst:
print('Exception in SqlExecute:', inst)
return -1
records = []
lst = data[0].splitlines()
# lst[0] column names; lst[1] dashed lines, (skip); lst[2:] data
# now we decode
for x in lst:
try:
#try default utf-8 decoding
line = x.decode()
except UnicodeDecodeError:
#in case of weird characters this one works most of the time
line = x.decode('ISO-8859-1')
lst2 = line.split(conn['colseparator'])
records.append(lst2)
fieldnames = records[0]
data = records[2:]
return data, fieldnames
def DatedString():
"""
Returns dated string with this format
2014_12_30_135857_4581860
"""
from datetime import datetime
now = str(datetime.now())
now = now.replace('-', '_')
now = now.replace(' ', '_')
now = now.replace(':', '')
now = now.replace('.', '_') + '0'
return now
def restart(self, timeout=None):
"""Restarts this Splunk instance.
The service is unavailable until it has successfully restarted.
If a *timeout* value is specified, ``restart`` blocks until the service
resumes or the timeout period has been exceeded. Otherwise, ``restart`` returns
immediately.
:param timeout: A timeout period, in seconds.
:type timeout: ``integer``
"""
msg = { "value": "Restart requested by " + self.username + "via the Splunk SDK for Python"}
# This message will be deleted once the server actually restarts.
self.messages.create(name="restart_required", **msg)
result = self.post("/services/server/control/restart")
if timeout is None:
return result
start = datetime.now()
diff = timedelta(seconds=timeout)
while datetime.now() - start < diff:
try:
self.login()
if not self.restart_required:
return result
except Exception, e:
sleep(1)
raise Exception, "Operation time out."
def search(self, query, **kwargs):
"""Runs a search using a search query and any optional arguments you
provide, and returns a `Job` object representing the search.
:param query: A search query.
:type query: ``string``
:param kwargs: Arguments for the search (optional):
* "output_mode" (``string``): Specifies the output format of the
results.
* "earliest_time" (``string``): Specifies the earliest time in the
time range to
search. The time string can be a UTC time (with fractional
seconds), a relative time specifier (to now), or a formatted
time string.
* "latest_time" (``string``): Specifies the latest time in the time
range to
search. The time string can be a UTC time (with fractional
seconds), a relative time specifier (to now), or a formatted
time string.
* "rf" (``string``): Specifies one or more fields to add to the
search.
:type kwargs: ``dict``
:rtype: class:`Job`
:returns: An object representing the created job.
"""
return self.jobs.create(query, **kwargs)
def touch(self):
"""Extends the expiration time of the search to the current time (now) plus
the time-to-live (ttl) value.
:return: The :class:`Job`.
"""
self.post("control", action="touch")
return self
def sunrise(self,when=None):
"""
return the time of sunrise as a datetime.time object
when is a datetime.datetime object. If none is given
a local time zone is assumed (including daylight saving
if present)
"""
if when is None : when = datetime.now(tz=LocalTimezone())
self.__preptime(when)
self.__calc()
return sun.__timefromdecimalday(self.sunrise_t)
def sunset(self,when=None):
if when is None : when = datetime.now(tz=LocalTimezone())
self.__preptime(when)
self.__calc()
return sun.__timefromdecimalday(self.sunset_t)
def solarnoon(self,when=None):
if when is None : when = datetime.now(tz=LocalTimezone())
self.__preptime(when)
self.__calc()
return sun.__timefromdecimalday(self.solarnoon_t)
def generateSSLCert():
if not os.path.exists(os.path.join(config.DATA_DIR, 'plexivity.key')) or not os.path.exists(os.path.join(config.DATA_DIR, 'plexivity.crt')):
logger.warning("plexivity was started with ssl support but no cert was found, trying to generating cert and key now")
try:
from OpenSSL import crypto, SSL
from socket import gethostname
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 1024)
# create a self-signed cert
cert = crypto.X509()
cert.get_subject().C = "US"
cert.get_subject().ST = "plex land"
cert.get_subject().L = "plex land"
cert.get_subject().O = "plexivity"
cert.get_subject().OU = "plexivity"
cert.get_subject().CN = gethostname()
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10*365*24*60*60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha1')
open(os.path.join(config.DATA_DIR, 'plexivity.crt'), "wt").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open(os.path.join(config.DATA_DIR, 'plexivity.key'), "wt").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
logger.info("ssl cert and key generated and saved to: %s" % config.DATA_DIR)
except:
logger.error("unable to generate ssl key and cert")
def startScheduler():
db.create_all()
#create default roles!
if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
admin_role = models.Role(name='admin', description='Administrator Role')
user_role = models.Role(name='user', description='User Role')
db.session.add(admin_role)
db.session.add(user_role)
db.session.commit()
try:
import tzlocal
tz = tzlocal.get_localzone()
logger.info("local timezone: %s" % tz)
except:
tz = None
if not tz or tz.zone == "local":
logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
'messages. To resolve this set up /etc/timezone with correct time zone name.')
tz = pytz.utc
#in debug mode this is executed twice :(
#DONT run flask in auto reload mode when testing this!
scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
scheduler.start()
sched = scheduler
#notify.task()