我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pandas.read_stata()。
def test_encoding(self): # GH 4626, proper encoding handling raw = read_stata(self.dta_encoding) encoded = read_stata(self.dta_encoding, encoding="latin-1") result = encoded.kreis1849[0] if compat.PY3: expected = raw.kreis1849[0] self.assertEqual(result, expected) self.assertIsInstance(result, compat.string_types) else: expected = raw.kreis1849.str.decode("latin-1")[0] self.assertEqual(result, expected) self.assertIsInstance(result, unicode) # noqa with tm.ensure_clean() as path: encoded.to_stata(path, encoding='latin-1', write_index=False) reread_encoded = read_stata(path, encoding='latin-1') tm.assert_frame_equal(encoded, reread_encoded)
def test_missing_value_conversion(self): columns = ['int8_', 'int16_', 'int32_', 'float32_', 'float64_'] smv = StataMissingValue(101) keys = [key for key in iterkeys(smv.MISSING_VALUES)] keys.sort() data = [] for i in range(27): row = [StataMissingValue(keys[i + (j * 27)]) for j in range(5)] data.append(row) expected = DataFrame(data, columns=columns) parsed_113 = read_stata(self.dta17_113, convert_missing=True) parsed_115 = read_stata(self.dta17_115, convert_missing=True) parsed_117 = read_stata(self.dta17_117, convert_missing=True) tm.assert_frame_equal(expected, parsed_113) tm.assert_frame_equal(expected, parsed_115) tm.assert_frame_equal(expected, parsed_117)
def test_dtype_conversion(self): expected = self.read_csv(self.csv15) expected['byte_'] = expected['byte_'].astype(np.int8) expected['int_'] = expected['int_'].astype(np.int16) expected['long_'] = expected['long_'].astype(np.int32) expected['float_'] = expected['float_'].astype(np.float32) expected['double_'] = expected['double_'].astype(np.float64) expected['date_td'] = expected['date_td'].apply(datetime.strptime, args=('%Y-%m-%d',)) no_conversion = read_stata(self.dta15_117, convert_dates=True) tm.assert_frame_equal(expected, no_conversion) conversion = read_stata(self.dta15_117, convert_dates=True, preserve_dtypes=False) # read_csv types are the same expected = self.read_csv(self.csv15) expected['date_td'] = expected['date_td'].apply(datetime.strptime, args=('%Y-%m-%d',)) tm.assert_frame_equal(expected, conversion)
def test_iterator(self): fname = self.dta3_117 parsed = read_stata(fname) itr = read_stata(fname, iterator=True) chunk = itr.read(5) tm.assert_frame_equal(parsed.iloc[0:5, :], chunk) itr = read_stata(fname, chunksize=5) chunk = list(itr) tm.assert_frame_equal(parsed.iloc[0:5, :], chunk[0]) itr = read_stata(fname, iterator=True) chunk = itr.get_chunk(5) tm.assert_frame_equal(parsed.iloc[0:5, :], chunk) itr = read_stata(fname, chunksize=5) chunk = itr.get_chunk() tm.assert_frame_equal(parsed.iloc[0:5, :], chunk) # GH12153 from_chunks = pd.concat(read_stata(fname, chunksize=4)) tm.assert_frame_equal(parsed, from_chunks)
def read_dta(self, file): # Legacy default reader configuration return read_stata(file, convert_dates=True)
def test_read_empty_dta(self): empty_ds = DataFrame(columns=['unit']) # GH 7369, make sure can read a 0-obs dta file with tm.ensure_clean() as path: empty_ds.to_stata(path, write_index=False) empty_ds2 = read_stata(path) tm.assert_frame_equal(empty_ds, empty_ds2)
def test_105(self): # Data obtained from: # http://go.worldbank.org/ZXY29PVJ21 dpath = os.path.join(self.dirpath, 'S4_EDUC1.dta') df = pd.read_stata(dpath) df0 = [[1, 1, 3, -2], [2, 1, 2, -2], [4, 1, 1, -2]] df0 = pd.DataFrame(df0) df0.columns = ["clustnum", "pri_schl", "psch_num", "psch_dis"] df0['clustnum'] = df0["clustnum"].astype(np.int16) df0['pri_schl'] = df0["pri_schl"].astype(np.int8) df0['psch_num'] = df0["psch_num"].astype(np.int8) df0['psch_dis'] = df0["psch_dis"].astype(np.float32) tm.assert_frame_equal(df.head(3), df0)
def test_drop_column(self): expected = self.read_csv(self.csv15) expected['byte_'] = expected['byte_'].astype(np.int8) expected['int_'] = expected['int_'].astype(np.int16) expected['long_'] = expected['long_'].astype(np.int32) expected['float_'] = expected['float_'].astype(np.float32) expected['double_'] = expected['double_'].astype(np.float64) expected['date_td'] = expected['date_td'].apply(datetime.strptime, args=('%Y-%m-%d',)) columns = ['byte_', 'int_', 'long_'] expected = expected[columns] dropped = read_stata(self.dta15_117, convert_dates=True, columns=columns) tm.assert_frame_equal(expected, dropped) # See PR 10757 columns = ['int_', 'long_', 'byte_'] expected = expected[columns] reordered = read_stata(self.dta15_117, convert_dates=True, columns=columns) tm.assert_frame_equal(expected, reordered) with tm.assertRaises(ValueError): columns = ['byte_', 'byte_'] read_stata(self.dta15_117, convert_dates=True, columns=columns) with tm.assertRaises(ValueError): columns = ['byte_', 'int_', 'long_', 'not_found'] read_stata(self.dta15_117, convert_dates=True, columns=columns)
def test_categorical_order(self): # Directly construct using expected codes # Format is is_cat, col_name, labels (in order), underlying data expected = [(True, 'ordered', ['a', 'b', 'c', 'd', 'e'], np.arange(5)), (True, 'reverse', ['a', 'b', 'c', 'd', 'e'], np.arange(5)[::-1]), (True, 'noorder', ['a', 'b', 'c', 'd', 'e'], np.array([2, 1, 4, 0, 3])), (True, 'floating', [ 'a', 'b', 'c', 'd', 'e'], np.arange(0, 5)), (True, 'float_missing', [ 'a', 'd', 'e'], np.array([0, 1, 2, -1, -1])), (False, 'nolabel', [ 1.0, 2.0, 3.0, 4.0, 5.0], np.arange(5)), (True, 'int32_mixed', ['d', 2, 'e', 'b', 'a'], np.arange(5))] cols = [] for is_cat, col, labels, codes in expected: if is_cat: cols.append((col, pd.Categorical.from_codes(codes, labels))) else: cols.append((col, pd.Series(labels, dtype=np.float32))) expected = DataFrame.from_items(cols) # Read with and with out categoricals, ensure order is identical parsed_115 = read_stata(self.dta19_115) parsed_117 = read_stata(self.dta19_117) tm.assert_frame_equal(expected, parsed_115) tm.assert_frame_equal(expected, parsed_117) # Check identity of codes for col in expected: if is_categorical_dtype(expected[col]): tm.assert_series_equal(expected[col].cat.codes, parsed_115[col].cat.codes) tm.assert_index_equal(expected[col].cat.categories, parsed_115[col].cat.categories)
def test_categorical_sorting(self): parsed_115 = read_stata(self.dta20_115) parsed_117 = read_stata(self.dta20_117) # Sort based on codes, not strings parsed_115 = parsed_115.sort_values("srh") parsed_117 = parsed_117.sort_values("srh") # Don't sort index parsed_115.index = np.arange(parsed_115.shape[0]) parsed_117.index = np.arange(parsed_117.shape[0]) codes = [-1, -1, 0, 1, 1, 1, 2, 2, 3, 4] categories = ["Poor", "Fair", "Good", "Very good", "Excellent"] cat = pd.Categorical.from_codes(codes=codes, categories=categories) expected = pd.Series(cat, name='srh') tm.assert_series_equal(expected, parsed_115["srh"]) tm.assert_series_equal(expected, parsed_117["srh"])
def test_categorical_ordering(self): parsed_115 = read_stata(self.dta19_115) parsed_117 = read_stata(self.dta19_117) parsed_115_unordered = read_stata(self.dta19_115, order_categoricals=False) parsed_117_unordered = read_stata(self.dta19_117, order_categoricals=False) for col in parsed_115: if not is_categorical_dtype(parsed_115[col]): continue tm.assert_equal(True, parsed_115[col].cat.ordered) tm.assert_equal(True, parsed_117[col].cat.ordered) tm.assert_equal(False, parsed_115_unordered[col].cat.ordered) tm.assert_equal(False, parsed_117_unordered[col].cat.ordered)
def test_read_chunks_115(self): files_115 = [self.dta2_115, self.dta3_115, self.dta4_115, self.dta14_115, self.dta15_115, self.dta16_115, self.dta17_115, self.dta18_115, self.dta19_115, self.dta20_115] for fname in files_115: for chunksize in 1, 2: for convert_categoricals in False, True: for convert_dates in False, True: # Read the whole file with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") parsed = read_stata( fname, convert_categoricals=convert_categoricals, convert_dates=convert_dates) # Compare to what we get when reading by chunk itr = read_stata( fname, iterator=True, convert_dates=convert_dates, convert_categoricals=convert_categoricals) pos = 0 for j in range(5): with warnings.catch_warnings(record=True) as w: # noqa warnings.simplefilter("always") try: chunk = itr.read(chunksize) except StopIteration: break from_frame = parsed.iloc[pos:pos + chunksize, :] tm.assert_frame_equal( from_frame, chunk, check_dtype=False, check_datetimelike_compat=True) pos += chunksize
def test_read_chunks_columns(self): fname = self.dta3_117 columns = ['quarter', 'cpi', 'm1'] chunksize = 2 parsed = read_stata(fname, columns=columns) itr = read_stata(fname, iterator=True) pos = 0 for j in range(5): chunk = itr.read(chunksize, columns=columns) if chunk is None: break from_frame = parsed.iloc[pos:pos + chunksize, :] tm.assert_frame_equal(from_frame, chunk, check_dtype=False) pos += chunksize
def load(self, pth): with open(pth, 'rb') as fh: data = read_stata(fh) return data.as_matrix(columns=data.columns[1:])