Python pandas 模块,read_stata() 实例源码
我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pandas.read_stata()。
def test_encoding(self):
# GH 4626, proper encoding handling
raw = read_stata(self.dta_encoding)
encoded = read_stata(self.dta_encoding, encoding="latin-1")
result = encoded.kreis1849[0]
if compat.PY3:
expected = raw.kreis1849[0]
self.assertEqual(result, expected)
self.assertIsInstance(result, compat.string_types)
else:
expected = raw.kreis1849.str.decode("latin-1")[0]
self.assertEqual(result, expected)
self.assertIsInstance(result, unicode) # noqa
with tm.ensure_clean() as path:
encoded.to_stata(path, encoding='latin-1', write_index=False)
reread_encoded = read_stata(path, encoding='latin-1')
tm.assert_frame_equal(encoded, reread_encoded)
def test_missing_value_conversion(self):
columns = ['int8_', 'int16_', 'int32_', 'float32_', 'float64_']
smv = StataMissingValue(101)
keys = [key for key in iterkeys(smv.MISSING_VALUES)]
keys.sort()
data = []
for i in range(27):
row = [StataMissingValue(keys[i + (j * 27)]) for j in range(5)]
data.append(row)
expected = DataFrame(data, columns=columns)
parsed_113 = read_stata(self.dta17_113, convert_missing=True)
parsed_115 = read_stata(self.dta17_115, convert_missing=True)
parsed_117 = read_stata(self.dta17_117, convert_missing=True)
tm.assert_frame_equal(expected, parsed_113)
tm.assert_frame_equal(expected, parsed_115)
tm.assert_frame_equal(expected, parsed_117)
def test_dtype_conversion(self):
expected = self.read_csv(self.csv15)
expected['byte_'] = expected['byte_'].astype(np.int8)
expected['int_'] = expected['int_'].astype(np.int16)
expected['long_'] = expected['long_'].astype(np.int32)
expected['float_'] = expected['float_'].astype(np.float32)
expected['double_'] = expected['double_'].astype(np.float64)
expected['date_td'] = expected['date_td'].apply(datetime.strptime,
args=('%Y-%m-%d',))
no_conversion = read_stata(self.dta15_117,
convert_dates=True)
tm.assert_frame_equal(expected, no_conversion)
conversion = read_stata(self.dta15_117,
convert_dates=True,
preserve_dtypes=False)
# read_csv types are the same
expected = self.read_csv(self.csv15)
expected['date_td'] = expected['date_td'].apply(datetime.strptime,
args=('%Y-%m-%d',))
tm.assert_frame_equal(expected, conversion)
def test_iterator(self):
fname = self.dta3_117
parsed = read_stata(fname)
itr = read_stata(fname, iterator=True)
chunk = itr.read(5)
tm.assert_frame_equal(parsed.iloc[0:5, :], chunk)
itr = read_stata(fname, chunksize=5)
chunk = list(itr)
tm.assert_frame_equal(parsed.iloc[0:5, :], chunk[0])
itr = read_stata(fname, iterator=True)
chunk = itr.get_chunk(5)
tm.assert_frame_equal(parsed.iloc[0:5, :], chunk)
itr = read_stata(fname, chunksize=5)
chunk = itr.get_chunk()
tm.assert_frame_equal(parsed.iloc[0:5, :], chunk)
# GH12153
from_chunks = pd.concat(read_stata(fname, chunksize=4))
tm.assert_frame_equal(parsed, from_chunks)
def read_dta(self, file):
# Legacy default reader configuration
return read_stata(file, convert_dates=True)
def test_read_empty_dta(self):
empty_ds = DataFrame(columns=['unit'])
# GH 7369, make sure can read a 0-obs dta file
with tm.ensure_clean() as path:
empty_ds.to_stata(path, write_index=False)
empty_ds2 = read_stata(path)
tm.assert_frame_equal(empty_ds, empty_ds2)
def test_105(self):
# Data obtained from:
# http://go.worldbank.org/ZXY29PVJ21
dpath = os.path.join(self.dirpath, 'S4_EDUC1.dta')
df = pd.read_stata(dpath)
df0 = [[1, 1, 3, -2], [2, 1, 2, -2], [4, 1, 1, -2]]
df0 = pd.DataFrame(df0)
df0.columns = ["clustnum", "pri_schl", "psch_num", "psch_dis"]
df0['clustnum'] = df0["clustnum"].astype(np.int16)
df0['pri_schl'] = df0["pri_schl"].astype(np.int8)
df0['psch_num'] = df0["psch_num"].astype(np.int8)
df0['psch_dis'] = df0["psch_dis"].astype(np.float32)
tm.assert_frame_equal(df.head(3), df0)
def test_drop_column(self):
expected = self.read_csv(self.csv15)
expected['byte_'] = expected['byte_'].astype(np.int8)
expected['int_'] = expected['int_'].astype(np.int16)
expected['long_'] = expected['long_'].astype(np.int32)
expected['float_'] = expected['float_'].astype(np.float32)
expected['double_'] = expected['double_'].astype(np.float64)
expected['date_td'] = expected['date_td'].apply(datetime.strptime,
args=('%Y-%m-%d',))
columns = ['byte_', 'int_', 'long_']
expected = expected[columns]
dropped = read_stata(self.dta15_117, convert_dates=True,
columns=columns)
tm.assert_frame_equal(expected, dropped)
# See PR 10757
columns = ['int_', 'long_', 'byte_']
expected = expected[columns]
reordered = read_stata(self.dta15_117, convert_dates=True,
columns=columns)
tm.assert_frame_equal(expected, reordered)
with tm.assertRaises(ValueError):
columns = ['byte_', 'byte_']
read_stata(self.dta15_117, convert_dates=True, columns=columns)
with tm.assertRaises(ValueError):
columns = ['byte_', 'int_', 'long_', 'not_found']
read_stata(self.dta15_117, convert_dates=True, columns=columns)
def test_categorical_order(self):
# Directly construct using expected codes
# Format is is_cat, col_name, labels (in order), underlying data
expected = [(True, 'ordered', ['a', 'b', 'c', 'd', 'e'], np.arange(5)),
(True, 'reverse', ['a', 'b', 'c',
'd', 'e'], np.arange(5)[::-1]),
(True, 'noorder', ['a', 'b', 'c', 'd',
'e'], np.array([2, 1, 4, 0, 3])),
(True, 'floating', [
'a', 'b', 'c', 'd', 'e'], np.arange(0, 5)),
(True, 'float_missing', [
'a', 'd', 'e'], np.array([0, 1, 2, -1, -1])),
(False, 'nolabel', [
1.0, 2.0, 3.0, 4.0, 5.0], np.arange(5)),
(True, 'int32_mixed', ['d', 2, 'e', 'b', 'a'],
np.arange(5))]
cols = []
for is_cat, col, labels, codes in expected:
if is_cat:
cols.append((col, pd.Categorical.from_codes(codes, labels)))
else:
cols.append((col, pd.Series(labels, dtype=np.float32)))
expected = DataFrame.from_items(cols)
# Read with and with out categoricals, ensure order is identical
parsed_115 = read_stata(self.dta19_115)
parsed_117 = read_stata(self.dta19_117)
tm.assert_frame_equal(expected, parsed_115)
tm.assert_frame_equal(expected, parsed_117)
# Check identity of codes
for col in expected:
if is_categorical_dtype(expected[col]):
tm.assert_series_equal(expected[col].cat.codes,
parsed_115[col].cat.codes)
tm.assert_index_equal(expected[col].cat.categories,
parsed_115[col].cat.categories)
def test_categorical_sorting(self):
parsed_115 = read_stata(self.dta20_115)
parsed_117 = read_stata(self.dta20_117)
# Sort based on codes, not strings
parsed_115 = parsed_115.sort_values("srh")
parsed_117 = parsed_117.sort_values("srh")
# Don't sort index
parsed_115.index = np.arange(parsed_115.shape[0])
parsed_117.index = np.arange(parsed_117.shape[0])
codes = [-1, -1, 0, 1, 1, 1, 2, 2, 3, 4]
categories = ["Poor", "Fair", "Good", "Very good", "Excellent"]
cat = pd.Categorical.from_codes(codes=codes, categories=categories)
expected = pd.Series(cat, name='srh')
tm.assert_series_equal(expected, parsed_115["srh"])
tm.assert_series_equal(expected, parsed_117["srh"])
def test_categorical_ordering(self):
parsed_115 = read_stata(self.dta19_115)
parsed_117 = read_stata(self.dta19_117)
parsed_115_unordered = read_stata(self.dta19_115,
order_categoricals=False)
parsed_117_unordered = read_stata(self.dta19_117,
order_categoricals=False)
for col in parsed_115:
if not is_categorical_dtype(parsed_115[col]):
continue
tm.assert_equal(True, parsed_115[col].cat.ordered)
tm.assert_equal(True, parsed_117[col].cat.ordered)
tm.assert_equal(False, parsed_115_unordered[col].cat.ordered)
tm.assert_equal(False, parsed_117_unordered[col].cat.ordered)
def test_read_chunks_115(self):
files_115 = [self.dta2_115, self.dta3_115, self.dta4_115,
self.dta14_115, self.dta15_115, self.dta16_115,
self.dta17_115, self.dta18_115, self.dta19_115,
self.dta20_115]
for fname in files_115:
for chunksize in 1, 2:
for convert_categoricals in False, True:
for convert_dates in False, True:
# Read the whole file
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
parsed = read_stata(
fname,
convert_categoricals=convert_categoricals,
convert_dates=convert_dates)
# Compare to what we get when reading by chunk
itr = read_stata(
fname, iterator=True,
convert_dates=convert_dates,
convert_categoricals=convert_categoricals)
pos = 0
for j in range(5):
with warnings.catch_warnings(record=True) as w: # noqa
warnings.simplefilter("always")
try:
chunk = itr.read(chunksize)
except StopIteration:
break
from_frame = parsed.iloc[pos:pos + chunksize, :]
tm.assert_frame_equal(
from_frame, chunk, check_dtype=False,
check_datetimelike_compat=True)
pos += chunksize
def test_read_chunks_columns(self):
fname = self.dta3_117
columns = ['quarter', 'cpi', 'm1']
chunksize = 2
parsed = read_stata(fname, columns=columns)
itr = read_stata(fname, iterator=True)
pos = 0
for j in range(5):
chunk = itr.read(chunksize, columns=columns)
if chunk is None:
break
from_frame = parsed.iloc[pos:pos + chunksize, :]
tm.assert_frame_equal(from_frame, chunk, check_dtype=False)
pos += chunksize