Python pandas 模块,NaT() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pandas.NaT()。
def assert_timestamp_equal(left, right, compare_nat_equal=True, msg=""):
"""
Assert that two pandas Timestamp objects are the same.
Parameters
----------
left, right : pd.Timestamp
The values to compare.
compare_nat_equal : bool, optional
Whether to consider `NaT` values equal. Defaults to True.
msg : str, optional
A message to forward to `pd.util.testing.assert_equal`.
"""
if compare_nat_equal and left is pd.NaT and right is pd.NaT:
return
return pd.util.testing.assert_equal(left, right, msg=msg)
def to_series(tuples):
"""Transforms a list of tuples of the form (date, count) in to a pandas
series indexed by dt.
"""
cleaned_time_val_tuples = [tuple for tuple in tuples if not (
tuple[0] is pd.NaT or tuple[1] is None)]
if len(cleaned_time_val_tuples) > 0:
# change list of tuples ie [(a1, b1), (a2, b2), ...] into
# tuple of lists ie ([a1, a2, ...], [b1, b2, ...])
unzipped_cleaned_time_values = zip(*cleaned_time_val_tuples)
# just being explicit about what these are
counts = unzipped_cleaned_time_values[1]
timestamps = unzipped_cleaned_time_values[0]
# Create the series with a sorted index.
ret_val = pd.Series(counts, index=timestamps).sort_index()
else:
ret_val = None
return ret_val
# In[ ]:
def assert_timestamp_equal(left, right, compare_nat_equal=True, msg=""):
"""
Assert that two pandas Timestamp objects are the same.
Parameters
----------
left, right : pd.Timestamp
The values to compare.
compare_nat_equal : bool, optional
Whether to consider `NaT` values equal. Defaults to True.
msg : str, optional
A message to forward to `pd.util.testing.assert_equal`.
"""
if compare_nat_equal and left is pd.NaT and right is pd.NaT:
return
return pd.util.testing.assert_equal(left, right, msg=msg)
def last_date_in_output_for_sid(self, sid):
"""
Parameters:
-----------
sid : int
Asset identifier.
Returns:
--------
out : pd.Timestamp
The midnight of the last date written in to the output for the
given sid.
"""
sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid))
if not os.path.exists(sizes_path):
return pd.NaT
with open(sizes_path, mode='r') as f:
sizes = f.read()
data = json.loads(sizes)
num_days = data['shape'][0] / self._minutes_per_day
if num_days == 0:
# empty container
return pd.NaT
return self._trading_days[num_days - 1]
def df_type_to_str(i):
'''
Convert into simple datatypes from pandas/numpy types
'''
if isinstance(i, np.bool_):
return bool(i)
if isinstance(i, np.int_):
return int(i)
if isinstance(i, np.float):
if np.isnan(i):
return 'NaN'
elif np.isinf(i):
return str(i)
return float(i)
if isinstance(i, np.uint):
return int(i)
if type(i) == bytes:
return i.decode('UTF-8')
if isinstance(i, (tuple, list)):
return str(i)
if i is pd.NaT: # not identified as a float null
return 'NaN'
return str(i)
def _infer_fill_value(val):
"""
infer the fill value for the nan/NaT from the provided
scalar/ndarray/list-like if we are a NaT, return the correct dtyped
element to provide proper block construction
"""
if not is_list_like(val):
val = [val]
val = np.array(val, copy=False)
if is_datetimelike(val):
return np.array('NaT', dtype=val.dtype)
elif is_object_dtype(val.dtype):
dtype = lib.infer_dtype(_ensure_object(val))
if dtype in ['datetime', 'datetime64']:
return np.array('NaT', dtype=_NS_DTYPE)
elif dtype in ['timedelta', 'timedelta64']:
return np.array('NaT', dtype=_TD_DTYPE)
return np.nan
def test_NaT_methods(self):
# GH 9513
raise_methods = ['astimezone', 'combine', 'ctime', 'dst',
'fromordinal', 'fromtimestamp', 'isocalendar',
'strftime', 'strptime', 'time', 'timestamp',
'timetuple', 'timetz', 'toordinal', 'tzname',
'utcfromtimestamp', 'utcnow', 'utcoffset',
'utctimetuple']
nat_methods = ['date', 'now', 'replace', 'to_datetime', 'today']
nan_methods = ['weekday', 'isoweekday']
for method in raise_methods:
if hasattr(NaT, method):
self.assertRaises(ValueError, getattr(NaT, method))
for method in nan_methods:
if hasattr(NaT, method):
self.assertTrue(np.isnan(getattr(NaT, method)()))
for method in nat_methods:
if hasattr(NaT, method):
self.assertIs(getattr(NaT, method)(), NaT)
# GH 12300
self.assertEqual(NaT.isoformat(), 'NaT')
def test_pickle(self):
# GH4606
p = self.round_trip_pickle(NaT)
self.assertTrue(p is NaT)
idx = pd.to_datetime(['2013-01-01', NaT, '2014-01-06'])
idx_p = self.round_trip_pickle(idx)
self.assertTrue(idx_p[0] == idx[0])
self.assertTrue(idx_p[1] is NaT)
self.assertTrue(idx_p[2] == idx[2])
# GH11002
# don't infer freq
idx = date_range('1750-1-1', '2050-1-1', freq='7D')
idx_p = self.round_trip_pickle(idx)
tm.assert_index_equal(idx, idx_p)
def test_timestamp_equality(self):
# GH 11034
s = Series([Timestamp('2000-01-29 01:59:00'), 'NaT'])
result = s != s
assert_series_equal(result, Series([False, True]))
result = s != s[0]
assert_series_equal(result, Series([False, True]))
result = s != s[1]
assert_series_equal(result, Series([True, True]))
result = s == s
assert_series_equal(result, Series([True, False]))
result = s == s[0]
assert_series_equal(result, Series([True, False]))
result = s == s[1]
assert_series_equal(result, Series([False, False]))
def test_asobject_tolist(self):
idx = timedelta_range(start='1 days', periods=4, freq='D', name='idx')
expected_list = [Timedelta('1 days'), Timedelta('2 days'),
Timedelta('3 days'), Timedelta('4 days')]
expected = pd.Index(expected_list, dtype=object, name='idx')
result = idx.asobject
self.assertTrue(isinstance(result, Index))
self.assertEqual(result.dtype, object)
self.assertTrue(result.equals(expected))
self.assertEqual(result.name, expected.name)
self.assertEqual(idx.tolist(), expected_list)
idx = TimedeltaIndex([timedelta(days=1), timedelta(days=2), pd.NaT,
timedelta(days=4)], name='idx')
expected_list = [Timedelta('1 days'), Timedelta('2 days'), pd.NaT,
Timedelta('4 days')]
expected = pd.Index(expected_list, dtype=object, name='idx')
result = idx.asobject
self.assertTrue(isinstance(result, Index))
self.assertEqual(result.dtype, object)
self.assertTrue(result.equals(expected))
self.assertEqual(result.name, expected.name)
self.assertEqual(idx.tolist(), expected_list)
def test_dti_tdi_numeric_ops(self):
# These are normally union/diff set-like ops
tdi = TimedeltaIndex(['1 days', pd.NaT, '2 days'], name='foo')
dti = date_range('20130101', periods=3, name='bar')
# TODO(wesm): unused?
# td = Timedelta('1 days')
# dt = Timestamp('20130101')
result = tdi - tdi
expected = TimedeltaIndex(['0 days', pd.NaT, '0 days'], name='foo')
tm.assert_index_equal(result, expected)
result = tdi + tdi
expected = TimedeltaIndex(['2 days', pd.NaT, '4 days'], name='foo')
tm.assert_index_equal(result, expected)
result = dti - tdi # name will be reset
expected = DatetimeIndex(['20121231', pd.NaT, '20130101'])
tm.assert_index_equal(result, expected)
def test_nat_fields(self):
# GH 10050
ts = Timestamp('NaT')
self.assertTrue(np.isnan(ts.year))
self.assertTrue(np.isnan(ts.month))
self.assertTrue(np.isnan(ts.day))
self.assertTrue(np.isnan(ts.hour))
self.assertTrue(np.isnan(ts.minute))
self.assertTrue(np.isnan(ts.second))
self.assertTrue(np.isnan(ts.microsecond))
self.assertTrue(np.isnan(ts.nanosecond))
self.assertTrue(np.isnan(ts.dayofweek))
self.assertTrue(np.isnan(ts.quarter))
self.assertTrue(np.isnan(ts.dayofyear))
self.assertTrue(np.isnan(ts.week))
self.assertTrue(np.isnan(ts.daysinmonth))
self.assertTrue(np.isnan(ts.days_in_month))
def test_period_cons_nat(self):
p = Period('NaT', freq='M')
self.assertEqual(p.ordinal, tslib.iNaT)
self.assertEqual(p.freq, 'M')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
p = Period('nat', freq='W-SUN')
self.assertEqual(p.ordinal, tslib.iNaT)
self.assertEqual(p.freq, 'W-SUN')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
p = Period(tslib.iNaT, freq='D')
self.assertEqual(p.ordinal, tslib.iNaT)
self.assertEqual(p.freq, 'D')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
p = Period(tslib.iNaT, freq='3D')
self.assertEqual(p.ordinal, tslib.iNaT)
self.assertEqual(p.freq, offsets.Day(3))
self.assertEqual(p.freqstr, '3D')
self.assertEqual((p + 1).ordinal, tslib.iNaT)
self.assertRaises(ValueError, Period, 'NaT')
def test_to_timestamp_pi_nat(self):
# GH 7228
index = PeriodIndex(['NaT', '2011-01', '2011-02'], freq='M',
name='idx')
result = index.to_timestamp('D')
expected = DatetimeIndex([pd.NaT, datetime(2011, 1, 1),
datetime(2011, 2, 1)], name='idx')
self.assertTrue(result.equals(expected))
self.assertEqual(result.name, 'idx')
result2 = result.to_period(freq='M')
self.assertTrue(result2.equals(index))
self.assertEqual(result2.name, 'idx')
result3 = result.to_period(freq='3M')
exp = PeriodIndex(['NaT', '2011-01', '2011-02'], freq='3M', name='idx')
self.assert_index_equal(result3, exp)
self.assertEqual(result3.freqstr, '3M')
msg = ('Frequency must be positive, because it'
' represents span: -2A')
with tm.assertRaisesRegexp(ValueError, msg):
result.to_period(freq='-2A')
def test_to_timedelta_invalid(self):
# these will error
self.assertRaises(ValueError, lambda: to_timedelta([1, 2], unit='foo'))
self.assertRaises(ValueError, lambda: to_timedelta(1, unit='foo'))
# time not supported ATM
self.assertRaises(ValueError, lambda: to_timedelta(time(second=1)))
self.assertTrue(to_timedelta(
time(second=1), errors='coerce') is pd.NaT)
self.assertRaises(ValueError, lambda: to_timedelta(['foo', 'bar']))
tm.assert_index_equal(TimedeltaIndex([pd.NaT, pd.NaT]),
to_timedelta(['foo', 'bar'], errors='coerce'))
tm.assert_index_equal(TimedeltaIndex(['1 day', pd.NaT, '1 min']),
to_timedelta(['1 day', 'bar', '1 min'],
errors='coerce'))
def test_apply_to_timedelta(self):
timedelta_NaT = pd.to_timedelta('NaT')
list_of_valid_strings = ['00:00:01', '00:00:02']
a = pd.to_timedelta(list_of_valid_strings)
b = Series(list_of_valid_strings).apply(pd.to_timedelta)
# Can't compare until apply on a Series gives the correct dtype
# assert_series_equal(a, b)
list_of_strings = ['00:00:01', np.nan, pd.NaT, timedelta_NaT]
# TODO: unused?
a = pd.to_timedelta(list_of_strings) # noqa
b = Series(list_of_strings).apply(pd.to_timedelta) # noqa
# Can't compare until apply on a Series gives the correct dtype
# assert_series_equal(a, b)
def test_isin_nan(self):
tm.assert_numpy_array_equal(
Index(['a', np.nan]).isin([np.nan]), [False, True])
tm.assert_numpy_array_equal(
Index(['a', pd.NaT]).isin([pd.NaT]), [False, True])
tm.assert_numpy_array_equal(
Index(['a', np.nan]).isin([float('nan')]), [False, False])
tm.assert_numpy_array_equal(
Index(['a', np.nan]).isin([pd.NaT]), [False, False])
# Float64Index overrides isin, so must be checked separately
tm.assert_numpy_array_equal(
Float64Index([1.0, np.nan]).isin([np.nan]), [False, True])
tm.assert_numpy_array_equal(
Float64Index([1.0, np.nan]).isin([float('nan')]), [False, True])
tm.assert_numpy_array_equal(
Float64Index([1.0, np.nan]).isin([pd.NaT]), [False, True])
def test_fillna_period(self):
# GH 11343
idx = pd.PeriodIndex(
['2011-01-01 09:00', pd.NaT, '2011-01-01 11:00'], freq='H')
exp = pd.PeriodIndex(
['2011-01-01 09:00', '2011-01-01 10:00', '2011-01-01 11:00'
], freq='H')
self.assert_index_equal(
idx.fillna(pd.Period('2011-01-01 10:00', freq='H')), exp)
exp = pd.Index([pd.Period('2011-01-01 09:00', freq='H'), 'x',
pd.Period('2011-01-01 11:00', freq='H')], dtype=object)
self.assert_index_equal(idx.fillna('x'), exp)
with tm.assertRaisesRegexp(
ValueError,
'Input has different freq=D from PeriodIndex\\(freq=H\\)'):
idx.fillna(pd.Period('2011-01-01', freq='D'))
def test_cummin_timedelta64(self):
s = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'NaT',
'1 min',
'NaT',
'3 min', ]))
expected = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'NaT',
'1 min',
'NaT',
'1 min', ]))
result = s.cummin(skipna=True)
self.assert_series_equal(expected, result)
expected = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'2 min',
'1 min',
'1 min',
'1 min', ]))
result = s.cummin(skipna=False)
self.assert_series_equal(expected, result)
def test_cummax_timedelta64(self):
s = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'NaT',
'1 min',
'NaT',
'3 min', ]))
expected = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'NaT',
'2 min',
'NaT',
'3 min', ]))
result = s.cummax(skipna=True)
self.assert_series_equal(expected, result)
expected = pd.Series(pd.to_timedelta(['NaT',
'2 min',
'2 min',
'2 min',
'2 min',
'3 min', ]))
result = s.cummax(skipna=False)
self.assert_series_equal(expected, result)
def test_ops_consistency_on_empty(self):
# GH 7869
# consistency on empty
# float
result = Series(dtype=float).sum()
self.assertEqual(result, 0)
result = Series(dtype=float).mean()
self.assertTrue(isnull(result))
result = Series(dtype=float).median()
self.assertTrue(isnull(result))
# timedelta64[ns]
result = Series(dtype='m8[ns]').sum()
self.assertEqual(result, Timedelta(0))
result = Series(dtype='m8[ns]').mean()
self.assertTrue(result is pd.NaT)
result = Series(dtype='m8[ns]').median()
self.assertTrue(result is pd.NaT)
def test_datetime64_tz_dropna(self):
# DatetimeBlock
s = Series([Timestamp('2011-01-01 10:00'), pd.NaT, Timestamp(
'2011-01-03 10:00'), pd.NaT])
result = s.dropna()
expected = Series([Timestamp('2011-01-01 10:00'),
Timestamp('2011-01-03 10:00')], index=[0, 2])
self.assert_series_equal(result, expected)
# DatetimeBlockTZ
idx = pd.DatetimeIndex(['2011-01-01 10:00', pd.NaT,
'2011-01-03 10:00', pd.NaT],
tz='Asia/Tokyo')
s = pd.Series(idx)
self.assertEqual(s.dtype, 'datetime64[ns, Asia/Tokyo]')
result = s.dropna()
expected = Series([Timestamp('2011-01-01 10:00', tz='Asia/Tokyo'),
Timestamp('2011-01-03 10:00', tz='Asia/Tokyo')],
index=[0, 2])
self.assertEqual(result.dtype, 'datetime64[ns, Asia/Tokyo]')
self.assert_series_equal(result, expected)
def test_valid_dt_with_missing_values(self):
from datetime import date, time
# GH 8689
s = Series(date_range('20130101', periods=5, freq='D'))
s.iloc[2] = pd.NaT
for attr in ['microsecond', 'nanosecond', 'second', 'minute', 'hour',
'day']:
expected = getattr(s.dt, attr).copy()
expected.iloc[2] = np.nan
result = getattr(s.dt, attr)
tm.assert_series_equal(result, expected)
result = s.dt.date
expected = Series(
[date(2013, 1, 1), date(2013, 1, 2), np.nan, date(2013, 1, 4),
date(2013, 1, 5)], dtype='object')
tm.assert_series_equal(result, expected)
result = s.dt.time
expected = Series(
[time(0), time(0), np.nan, time(0), time(0)], dtype='object')
tm.assert_series_equal(result, expected)
def test_first_last_max_min_on_time_data(self):
# GH 10295
# Verify that NaT is not in the result of max, min, first and last on
# Dataframe with datetime or timedelta values.
from datetime import timedelta as td
df_test = DataFrame(
{'dt': [nan, '2015-07-24 10:10', '2015-07-25 11:11',
'2015-07-23 12:12', nan],
'td': [nan, td(days=1), td(days=2), td(days=3), nan]})
df_test.dt = pd.to_datetime(df_test.dt)
df_test['group'] = 'A'
df_ref = df_test[df_test.dt.notnull()]
grouped_test = df_test.groupby('group')
grouped_ref = df_ref.groupby('group')
assert_frame_equal(grouped_ref.max(), grouped_test.max())
assert_frame_equal(grouped_ref.min(), grouped_test.min())
assert_frame_equal(grouped_ref.first(), grouped_test.first())
assert_frame_equal(grouped_ref.last(), grouped_test.last())
def test_datetimeindex(self):
index = date_range('20130102', periods=6)
s = Series(1, index=index)
result = s.to_string()
self.assertTrue('2013-01-02' in result)
# nat in index
s2 = Series(2, index=[Timestamp('20130111'), NaT])
s = s2.append(s)
result = s.to_string()
self.assertTrue('NaT' in result)
# nat in summary
result = str(s2.index)
self.assertTrue('NaT' in result)
def test_timestamp_compare(self):
# make sure we can compare Timestamps on the right AND left hand side
# GH4982
df = DataFrame({'dates1': date_range('20010101', periods=10),
'dates2': date_range('20010102', periods=10),
'intcol': np.random.randint(1000000000, size=10),
'floatcol': np.random.randn(10),
'stringcol': list(tm.rands(10))})
df.loc[np.random.rand(len(df)) > 0.5, 'dates2'] = pd.NaT
ops = {'gt': 'lt', 'lt': 'gt', 'ge': 'le', 'le': 'ge', 'eq': 'eq',
'ne': 'ne'}
for left, right in ops.items():
left_f = getattr(operator, left)
right_f = getattr(operator, right)
# no nats
expected = left_f(df, Timestamp('20010109'))
result = right_f(Timestamp('20010109'), df)
assert_frame_equal(result, expected)
# nats
expected = left_f(df, Timestamp('nat'))
result = right_f(Timestamp('nat'), df)
assert_frame_equal(result, expected)
def test_v12_compat(self):
df = DataFrame(
[[1.56808523, 0.65727391, 1.81021139, -0.17251653],
[-0.2550111, -0.08072427, -0.03202878, -0.17581665],
[1.51493992, 0.11805825, 1.629455, -1.31506612],
[-0.02765498, 0.44679743, 0.33192641, -0.27885413],
[0.05951614, -2.69652057, 1.28163262, 0.34703478]],
columns=['A', 'B', 'C', 'D'],
index=pd.date_range('2000-01-03', '2000-01-07'))
df['date'] = pd.Timestamp('19920106 18:21:32.12')
df.ix[3, 'date'] = pd.Timestamp('20130101')
df['modified'] = df['date']
df.ix[1, 'modified'] = pd.NaT
v12_json = os.path.join(self.dirpath, 'tsframe_v012.json')
df_unser = pd.read_json(v12_json)
assert_frame_equal(df, df_unser)
df_iso = df.drop(['modified'], axis=1)
v12_iso_json = os.path.join(self.dirpath, 'tsframe_iso_v012.json')
df_unser_iso = pd.read_json(v12_iso_json)
assert_frame_equal(df_iso, df_unser_iso)
def test_date_format_frame(self):
df = self.tsframe.copy()
def test_w_date(date, date_unit=None):
df['date'] = Timestamp(date)
df.ix[1, 'date'] = pd.NaT
df.ix[5, 'date'] = pd.NaT
if date_unit:
json = df.to_json(date_format='iso', date_unit=date_unit)
else:
json = df.to_json(date_format='iso')
result = read_json(json)
assert_frame_equal(result, df)
test_w_date('20130101 20:43:42.123')
test_w_date('20130101 20:43:42', date_unit='s')
test_w_date('20130101 20:43:42.123', date_unit='ms')
test_w_date('20130101 20:43:42.123456', date_unit='us')
test_w_date('20130101 20:43:42.123456789', date_unit='ns')
self.assertRaises(ValueError, df.to_json, date_format='iso',
date_unit='foo')
def test_date_format_series(self):
def test_w_date(date, date_unit=None):
ts = Series(Timestamp(date), index=self.ts.index)
ts.ix[1] = pd.NaT
ts.ix[5] = pd.NaT
if date_unit:
json = ts.to_json(date_format='iso', date_unit=date_unit)
else:
json = ts.to_json(date_format='iso')
result = read_json(json, typ='series')
assert_series_equal(result, ts)
test_w_date('20130101 20:43:42.123')
test_w_date('20130101 20:43:42', date_unit='s')
test_w_date('20130101 20:43:42.123', date_unit='ms')
test_w_date('20130101 20:43:42.123456', date_unit='us')
test_w_date('20130101 20:43:42.123456789', date_unit='ns')
ts = Series(Timestamp('20130101 20:43:42.123'), index=self.ts.index)
self.assertRaises(ValueError, ts.to_json, date_format='iso',
date_unit='foo')
def test_date_unit(self):
df = self.tsframe.copy()
df['date'] = Timestamp('20130101 20:43:42')
df.ix[1, 'date'] = Timestamp('19710101 20:43:42')
df.ix[2, 'date'] = Timestamp('21460101 20:43:42')
df.ix[4, 'date'] = pd.NaT
for unit in ('s', 'ms', 'us', 'ns'):
json = df.to_json(date_format='epoch', date_unit=unit)
# force date unit
result = read_json(json, date_unit=unit)
assert_frame_equal(result, df)
# detect date unit
result = read_json(json, date_unit=None)
assert_frame_equal(result, df)
def zipline_splits_and_dividends(symbol_map):
raw_splits, raw_dividends = load_splits_and_dividends()
splits = []
dividends = []
for sid, code in symbol_map.iteritems():
if code in raw_splits:
split = pd.DataFrame(data=raw_splits[code])
split['sid'] = sid
split.index = split['effective_date'] = pd.DatetimeIndex(split['effective_date'])
splits.append(split)
if code in raw_dividends:
dividend = pd.DataFrame(data = raw_dividends[code])
dividend['sid'] = sid
dividend['record_date'] = dividend['declared_date'] = dividend['pay_date'] = pd.NaT
dividend.index = dividend['ex_date'] = pd.DatetimeIndex(dividend['ex_date'])
dividends.append(dividend)
return splits, dividends
def _display_dimensions(self, dimensions, operations):
req_dimension_keys = [utils.slice_first(dimension)
for dimension in dimensions]
display_dims = OrderedDict()
for key in req_dimension_keys:
dimension = self.slicer.dimensions[key]
display_dim = {'label': dimension.label}
if hasattr(dimension, 'display_options'):
display_dim['display_options'] = {opt.key: opt.label
for opt in dimension.display_options}
display_dim['display_options'].update({pd.NaT: '', np.nan: ''})
if hasattr(dimension, 'display_field') and dimension.display_field:
display_dim['display_field'] = '%s_display' % dimension.key
display_dims[key] = display_dim
return display_dims
def test_categorical_dimension(self):
display_schema = self.test_slicer.manager.display_schema(
metrics=['foo'],
dimensions=['locale'],
)
self.assertDictEqual(
{
'metrics': {'foo': {'label': 'foo', 'axis': 0}},
'dimensions': {
'locale': {'label': 'Locale', 'display_options': {
'us': 'United States', 'de': 'Germany', np.nan: '', pd.NaT: ''
}},
},
'references': {},
},
display_schema
)
def test_multiple_metrics_and_dimensions(self):
display_schema = self.test_slicer.manager.display_schema(
metrics=['foo', 'bar'],
dimensions=[('date', DatetimeDimension.month), ('clicks', 50, 100), 'locale', 'account'],
)
self.assertDictEqual(
{
'metrics': {
'foo': {'label': 'foo', 'axis': 0},
'bar': {'label': 'FizBuz', 'axis': 1},
},
'dimensions': {
'date': {'label': 'date'},
'clicks': {'label': 'My Clicks'},
'locale': {'label': 'Locale', 'display_options': {
'us': 'United States', 'de': 'Germany', np.nan: '', pd.NaT: ''
}},
'account': {'label': 'Account', 'display_field': 'account_display'},
},
'references': {},
},
display_schema
)
def _make_time(timearr):
"""Return a :class:`datetime.datetime` object for the array of characters.
Args:
timearr (:class:`numpy.ndarray`): An array of characters.
Returns:
:class:`datetime.datetime`: A datetime object.
"""
try:
return dt.datetime.strptime("".join(npbytes_to_str(timearr)),
"%Y-%m-%d_%H:%M:%S")
except ValueError:
return np.datetime64("NaT")
def test_date_breaks():
# cpython
x = [datetime(year, 1, 1) for year in [2010, 2026, 2015]]
limits = min(x), max(x)
breaks = date_breaks('5 Years')
years = [d.year for d in breaks(limits)]
npt.assert_array_equal(
years, [2010, 2015, 2020, 2025, 2030])
breaks = date_breaks('10 Years')
years = [d.year for d in breaks(limits)]
npt.assert_array_equal(years, [2010, 2020, 2030])
# numpy
x = [np.datetime64(i*10, 'D') for i in range(1, 10)]
breaks = date_breaks('10 Years')
limits = min(x), max(x)
with pytest.raises(AttributeError):
breaks(limits)
# NaT
limits = np.datetime64('NaT'), datetime(2017, 1, 1)
breaks = date_breaks('10 Years')
assert len(breaks(limits)) == 0
def automatic_events(self, timestamp):
"""
Update the current time of the Blotter, triggering all scheduled events
between previous clock time and new clock time such as interest
charges, margin charges, PnL calculations and PnL sweeps. See
create_events() for more information on the type of events.
Parameters
----------
timestamp: pandas.Timestamp
Time to update clock to and tigger internal events up until
"""
current_time = self._holdings.timestamp
# first event so there is nothing automatic that needs to be done
if current_time is pd.NaT:
return
actions = self._get_actions(current_time, timestamp, self._actions)
for ts, action in actions.iteritems():
events = self.create_events(ts, action)
self.dispatch_events(events)
def pad(self, sid, date):
"""
Fill sid container with empty data through the specified date.
e.g. if the date is two days after the last date in the sid's existing
output, 2 x `minute_per_day` worth of zeros will be added to the
output.
Parameters:
-----------
sid : int
The asset identifier for the data being written.
date : datetime-like
The date used to calculate how many slots to be pad.
The padding is done through the date, i.e. after the padding is
done the `last_date_in_output_for_sid` will be equal to `date`
"""
table = self._ensure_ctable(sid)
last_date = self.last_date_in_output_for_sid(sid)
tds = self._trading_days
if date <= last_date or date < tds[0]:
# No need to pad.
return
if last_date == pd.NaT:
# If there is no data, determine how many days to add so that
# desired days are written to the correct slots.
days_to_zerofill = tds[tds.slice_indexer(end=date)]
else:
days_to_zerofill = tds[tds.slice_indexer(
start=last_date + tds.freq,
end=date)]
self._zerofill(table, len(days_to_zerofill))
new_last_date = self.last_date_in_output_for_sid(sid)
assert new_last_date == date, "new_last_date={0} != date={1}".format(
new_last_date, date)
def __init__(self,
window,
items,
sids,
cap_multiple=2,
dtype=np.float64,
initial_dates=None):
self._pos = window
self._window = window
self.items = _ensure_index(items)
self.minor_axis = _ensure_index(sids)
self.cap_multiple = cap_multiple
self.dtype = dtype
if initial_dates is None:
self.date_buf = np.empty(self.cap, dtype='M8[ns]') * pd.NaT
elif len(initial_dates) != window:
raise ValueError('initial_dates must be of length window')
else:
self.date_buf = np.hstack(
(
initial_dates,
np.empty(
window * (cap_multiple - 1),
dtype='datetime64[ns]',
),
),
)
self.buffer = self._create_buffer()
def _update_dividends(self, asset_id, raw_data):
divs = raw_data.ex_dividend
df = pd.DataFrame({'amount': divs[divs != 0]})
df.index.name = 'ex_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
# we do not have this data in the WIKI dataset
df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT
self.dividends.append(df)
def last_date_in_output_for_sid(self, sid):
"""
Parameters
----------
sid : int
Asset identifier.
Returns
-------
out : pd.Timestamp
The midnight of the last date written in to the output for the
given sid.
"""
sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid))
if not os.path.exists(sizes_path):
return pd.NaT
with open(sizes_path, mode='r') as f:
sizes = f.read()
data = json.loads(sizes)
# use integer division so that the result is an int
# for pandas index later https://github.com/pandas-dev/pandas/blob/master/pandas/tseries/base.py#L247 # noqa
num_days = data['shape'][0] // self._minutes_per_day
if num_days == 0:
# empty container
return pd.NaT
return self._session_labels[num_days - 1]
def get_last_traded_dt(self, asset, dt):
"""
Get the latest minute on or before ``dt`` in which ``asset`` traded.
If there are no trades on or before ``dt``, returns ``pd.NaT``.
Parameters
----------
asset : catalyst.asset.Asset
The asset for which to get the last traded minute.
dt : pd.Timestamp
The minute at which to start searching for the last traded minute.
Returns
-------
last_traded : pd.Timestamp
The dt of the last trade for the given asset, using the input
dt as a vantage point.
"""
rf = self._roll_finders[asset.roll_style]
sid = (rf.get_contract_center(asset.root_symbol,
dt,
asset.offset))
if sid is None:
return pd.NaT
contract = rf.asset_finder.retrieve_asset(sid)
return self._bar_reader.get_last_traded_dt(contract, dt)
def get_last_traded_dt(self, asset, dt):
"""
Get the latest minute on or before ``dt`` in which ``asset`` traded.
If there are no trades on or before ``dt``, returns ``pd.NaT``.
Parameters
----------
asset : catalyst.asset.Asset
The asset for which to get the last traded minute.
dt : pd.Timestamp
The minute at which to start searching for the last traded minute.
Returns
-------
last_traded : pd.Timestamp
The dt of the last trade for the given asset, using the input
dt as a vantage point.
"""
rf = self._roll_finders[asset.roll_style]
sid = (rf.get_contract_center(asset.root_symbol,
dt,
asset.offset))
if sid is None:
return pd.NaT
contract = rf.asset_finder.retrieve_asset(sid)
return self._bar_reader.get_last_traded_dt(contract, dt)
def _get_daily_spot_value(self, asset, column, dt):
reader = self._get_pricing_reader('daily')
if column == "last_traded":
last_traded_dt = reader.get_last_traded_dt(asset, dt)
if isnull(last_traded_dt):
return pd.NaT
else:
return last_traded_dt
elif column in OHLCV_FIELDS:
# don't forward fill
try:
return reader.get_value(asset, dt, column)
except NoDataOnDate:
return np.nan
elif column == "price":
found_dt = dt
while True:
try:
value = reader.get_value(
asset, found_dt, "close"
)
if not isnull(value):
if dt == found_dt:
return value
else:
# adjust if needed
return self.get_adjusted_value(
asset, column, found_dt, dt, "minute",
spot_value=value
)
else:
found_dt -= self.trading_calendar.day
except NoDataOnDate:
return np.nan
def assert_same(self, val1, val2):
try:
self.assertEqual(val1, val2)
except AssertionError:
if val1 is pd.NaT:
self.assertTrue(val2 is pd.NaT)
elif np.isnan(val1):
self.assertTrue(np.isnan(val2))
else:
raise
def test_day_before_assets_trading(self):
# use the day before self.bcolz_daily_bar_days[0]
minute = self.get_last_minute_of_session(
self.trading_calendar.previous_session_label(
self.equity_daily_bar_days[0]
)
)
bar_data = self.create_bardata(
simulation_dt_func=lambda: minute,
)
self.check_internal_consistency(bar_data)
self.assertFalse(bar_data.can_trade(self.ASSET1))
self.assertFalse(bar_data.can_trade(self.ASSET2))
self.assertFalse(bar_data.is_stale(self.ASSET1))
self.assertFalse(bar_data.is_stale(self.ASSET2))
for field in ALL_FIELDS:
for asset in self.ASSETS:
asset_value = bar_data.current(asset, field)
if field in OHLCP:
self.assertTrue(np.isnan(asset_value))
elif field == "volume":
self.assertEqual(0, asset_value)
elif field == "last_traded":
self.assertTrue(asset_value is pd.NaT)
def test_semi_active_day(self):
# on self.equity_daily_bar_days[0], only asset1 has data
bar_data = self.create_bardata(
simulation_dt_func=lambda: self.get_last_minute_of_session(
self.equity_daily_bar_days[0]
),
)
self.check_internal_consistency(bar_data)
self.assertTrue(bar_data.can_trade(self.ASSET1))
self.assertFalse(bar_data.can_trade(self.ASSET2))
# because there is real data
self.assertFalse(bar_data.is_stale(self.ASSET1))
# because there has never been a trade bar yet
self.assertFalse(bar_data.is_stale(self.ASSET2))
self.assertEqual(3, bar_data.current(self.ASSET1, "open"))
self.assertEqual(4, bar_data.current(self.ASSET1, "high"))
self.assertEqual(1, bar_data.current(self.ASSET1, "low"))
self.assertEqual(2, bar_data.current(self.ASSET1, "close"))
self.assertEqual(200, bar_data.current(self.ASSET1, "volume"))
self.assertEqual(2, bar_data.current(self.ASSET1, "price"))
self.assertEqual(self.equity_daily_bar_days[0],
bar_data.current(self.ASSET1, "last_traded"))
for field in OHLCP:
self.assertTrue(np.isnan(bar_data.current(self.ASSET2, field)),
field)
self.assertEqual(0, bar_data.current(self.ASSET2, "volume"))
self.assertTrue(
bar_data.current(self.ASSET2, "last_traded") is pd.NaT
)
def encode_as_pandas(obj):
"""Attempt to convert pandas.NaT"""
if not _pandas_imported:
raise NotEncodable
if obj is pandas.NaT:
return None
else:
raise NotEncodable
def test_should_properly_handle_null_timestamp(self):
query = 'SELECT TIMESTAMP(NULL) AS null_timestamp'
df = gbq.read_gbq(query, project_id=_get_project_id(),
private_key=_get_private_key_path())
tm.assert_frame_equal(df, DataFrame({'null_timestamp': [NaT]}))