我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用pandas.read_fwf()。
def parse(omni_fname, colspecs=COLSPECS, names=NAMES, na_values=NA_VALUES): """ Parse the OMNI data record *omni_fname* and return a :class:`DataFrame`. To parse, use the fixed columns *colspecs*, the column identifiers *names*, and acceptable NaN column mapping *na_values*. """ df = PD.read_fwf(omni_fname, colspecs=colspecs, header=None, names=names, na_values=na_values, parse_dates={'date': [0, 1, 2, 3]}, date_parser=lambda x: datetime.strptime(x, '%Y %j %H %M')) df.set_index('date', inplace=True) return df
def zipfiles2dataframe(zip, rexpr): """ Extract files in the zip that match the regular expression rexpr and load them as a data frame """ dfs = [] for f in zip.filelist: m = rexpr.search(f.filename) if not m: continue df = pd.read_fwf(StringIO(zip.read(f).decode('utf-8')), header=None, skiprows=1) df.columns = ['event', 'tau'] df['user'], df['session'] = m.groups() dfs.append(df) df = pd.concat(dfs).set_index(['user', 'session']) return df
def __init__(self, filename=TABLE_FILENAME): MS = SpectralTypeRelations.MainSequence() # Read in the table. colspecs=[[0,7], [7,14], [14,21], [21,28], [28,34], [34,40], [40,47], [47,55], [55,63], [63,70], [70,78], [78,86], [86,94], [94,103], [103,110], [110,116], [116,122], [122,130], [130,137], [137,144], [144,151], [151,158]] mam_df = pd.read_fwf(filename, header=20, colspecs=colspecs, na_values=['...'])[:92] # Strip the * from the logAge column. Probably shouldn't but... mam_df['logAge'] = mam_df['logAge'].map(lambda s: s.strip('*') if isinstance(s, basestring) else s) # Convert everything to floats for col in mam_df.columns: mam_df[col] = pd.to_numeric(mam_df[col], errors='ignore') # Add the spectral type number for interpolation mam_df['SpTNum'] = mam_df['SpT'].map(MS.SpT_To_Number) self.mam_df = mam_df
def test_fwf_colspecs_None(self): # GH 7079 data = """\ 123456 456789 """ colspecs = [(0, 3), (3, None)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123, 456], [456, 789]]) tm.assert_frame_equal(result, expected) colspecs = [(None, 3), (3, 6)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123, 456], [456, 789]]) tm.assert_frame_equal(result, expected) colspecs = [(0, None), (3, None)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123456, 456], [456789, 789]]) tm.assert_frame_equal(result, expected) colspecs = [(None, None), (3, 6)] result = read_fwf(StringIO(data), colspecs=colspecs, header=None) expected = DataFrame([[123456, 456], [456789, 789]]) tm.assert_frame_equal(result, expected)
def test_fwf_for_uint8(self): data = """1421302965.213420 PRI=3 PGN=0xef00 DST=0x17 SRC=0x28 04 154 00 00 00 00 00 127 1421302964.226776 PRI=6 PGN=0xf002 SRC=0x47 243 00 00 255 247 00 00 71""" df = read_fwf(StringIO(data), colspecs=[(0, 17), (25, 26), (33, 37), (49, 51), (58, 62), (63, 1000)], names=['time', 'pri', 'pgn', 'dst', 'src', 'data'], converters={ 'pgn': lambda x: int(x, 16), 'src': lambda x: int(x, 16), 'dst': lambda x: int(x, 16), 'data': lambda x: len(x.split(' '))}) expected = DataFrame([[1421302965.213420, 3, 61184, 23, 40, 8], [1421302964.226776, 6, 61442, None, 71, 8]], columns=["time", "pri", "pgn", "dst", "src", "data"]) expected["dst"] = expected["dst"].astype(object) tm.assert_frame_equal(df, expected)
def test_fwf_compression(self): try: import gzip import bz2 except ImportError: raise nose.SkipTest("Need gzip and bz2 to run this test") data = """1111111111 2222222222 3333333333""".strip() widths = [5, 5] names = ['one', 'two'] expected = read_fwf(StringIO(data), widths=widths, names=names) if compat.PY3: data = bytes(data, encoding='utf-8') comps = [('gzip', gzip.GzipFile), ('bz2', bz2.BZ2File)] for comp_name, compresser in comps: with tm.ensure_clean() as path: tmp = compresser(path, mode='wb') tmp.write(data) tmp.close() result = read_fwf(path, widths=widths, names=names, compression=comp_name) tm.assert_frame_equal(result, expected)
def parse_basis_set(self): # Find the basis set start = self.find(_re_bas_00, keys_only=True)[-1] + 3 stopa = self.find_next(_re_bas_01, start=start, keys_only=True) stopb = self.find_next(_re_bas_02, start=start, keys_only=True) try: stop = min(stopa, stopb) except TypeError: stop = stopa # Grab everything df = pd.read_fwf(StringIO('\n'.join(self[start:stop])), widths=[4, 2, 12, 4], names=['n', 'L', 'alpha', 'symbol']) # Where atom types change idxs = [0] + df['n'][df['n'] == '---'].index.tolist() + [df.shape[0]] sets, shells = [], [] for i, (start, stop) in enumerate(zip(idxs, idxs[1:])): sets.append(np.repeat(i - 1, stop - start)) shells.append(np.arange(-1, stop - start - 1)) df['set'] = np.concatenate(sets) df['shell'] = np.concatenate(shells) # Atom table basis set map basmap = df['symbol'].dropna() basmap = basmap[basmap.str.endswith(')')].str.strip(')') basmap = {val: df['set'][key] + 1 for key, val in basmap.to_dict().items()} # Discard the garbage drop = df['n'].str.strip().str.isnumeric().fillna(False) df.drop(drop[drop == False].index, inplace=True) df.drop('symbol', axis=1, inplace=True) # Clean up the series df['alpha'] = df['alpha'].astype(np.float64) df['n'] = df['n'].astype(np.int64) df['L'] = df['L'].str.lower().map(lmap) df['d'] = np.sqrt((2 * df['L'] + 1) / (4 * np.pi)) df['r'] = df['n'] - (df['L'] + 1) df['frame'] = 0 self.basis_set = BasisSet(df, gaussian=False, spherical=False) self.atom['set'] = self.atom['symbol'].map(basmap)
def parse_contribution(self): # MO contribution by percentage found = self.find(_re_con_00, keys_only=True) starts = [i + 3 for i in found] widths = [12, 6, 6, 6, 11, 6, 10, 12, 6, 6, 3] names = ['eV', 'occupation', 'vector', 'sym', '%', 'SFO', 'angmom', 'eV(sfo)', 'occ(sfo)', 'atom', 'symbol'] dfs = [] # Prints for both spins for i, start in enumerate(starts): stop = start while self[stop].strip(): stop += 1 dfs.append(pd.read_fwf(StringIO('\n'.join(self[start:stop])), delim_whitespace=True, widths=widths, names=names)) dfs[-1]['spin'] = i dfs = pd.concat(dfs).reset_index(drop=True) # Maybe a better way to do this def _snan(x): return np.nan if isinstance(x, str) and x.isspace() else x dfs = dfs.applymap(_snan) dfs.fillna(method='ffill', inplace=True) # Clean up dfs['symbol'] = dfs['symbol'].str.strip() dfs['angmom'] = dfs['angmom'].str.strip() dfs['angmom'].update(dfs['angmom'].map({'S': 'S:'})) dfs[['L', 'ml']] = dfs['angmom'].str.extract('(.*):(.*)', expand=True) dfs['%'] = dfs['%'].str.replace('%', '') dfs['%'].update(dfs['%'].map({" ******": np.inf})) dfs['%'] = dfs['%'].astype(np.float64) dfs['occupation'] = dfs['occupation'].astype(np.float64) dfs['vector'] = dfs['vector'].astype(np.int64) - 1 dfs['eV'] = dfs['eV'].astype(np.float64) dfs['atom'] -= 1 self.contribution = dfs
def _one_el(self, starts, step, ncol): func = pd.read_csv kwargs = {'header': None} if ncol == 1: func = pd.read_fwf kwargs['widths'] = [18] * 4 else: kwargs['delim_whitespace'] = True return [func(StringIO('\n'.join(self[start:start + step])), **kwargs).stack().values for start in starts]
def _read_ZLS_format_file(filepath): col_names = ['line_name', 'year', 'day', 'hour', 'minute', 'second', 'sensor', 'spring_tension', 'cross_coupling', 'raw_beam', 'vcc', 'al', 'ax', 've2', 'ax2', 'xacc2', 'lacc2', 'xacc', 'lacc', 'par_port', 'platform_period'] col_widths = [10, 4, 3, 2, 2, 2, 8, 8, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 6] time_columns = ['year', 'day', 'hour', 'minute', 'second'] # read into dataframe df = pd.read_fwf(filepath, widths=col_widths, names=col_names) day_fmt = lambda x: '{:03d}'.format(x) time_fmt = lambda x: '{:02d}'.format(x) t = df['year'].map(str) + df['day'].map(day_fmt) + \ df['hour'].map(time_fmt) + df['minute'].map(time_fmt) + \ df['second'].map(time_fmt) # index by datetime df.index = pd.to_datetime(t, format='%Y%j%H%M%S') df.drop(time_columns, axis=1, inplace=True) return df
def get_diaginfo(diaginfo_file): """ Read an output's diaginfo.dat file and parse into a DataFrame for use in selecting and parsing categories. Parameters ---------- diaginfo_file : str Path to diaginfo.dat Returns ------- DataFrame containing the category information. """ widths = [rec.width for rec in diag_recs] col_names = [rec.name for rec in diag_recs] dtypes = [rec.type for rec in diag_recs] usecols = [name for name in col_names if name != '-'] diag_df = pd.read_fwf(diaginfo_file, widths=widths, names=col_names, dtypes=dtypes, comment="#", header=None, usecols=usecols) diag_desc = {diag.name: diag.desc for diag in diag_recs if diag.name != '-'} return diag_df, diag_desc
def test_1000_fwf(self): data = """ 1 2,334.0 5 10 13 10. """ expected = [[1, 2334., 5], [10, 13, 10]] df = read_fwf(StringIO(data), colspecs=[(0, 3), (3, 11), (12, 16)], thousands=',') tm.assert_almost_equal(df.values, expected)
def test_comment_fwf(self): data = """ 1 2. 4 #hello world 5 NaN 10.0 """ expected = [[1, 2., 4], [5, np.nan, 10.]] df = read_fwf(StringIO(data), colspecs=[(0, 3), (4, 9), (9, 25)], comment='#') tm.assert_almost_equal(df.values, expected)
def test_fwf_colspecs_is_list_or_tuple_of_two_element_tuples(self): with tm.assertRaisesRegexp(TypeError, 'Each column specification must be.+'): read_fwf(StringIO(self.data1), [('a', 1)])
def test_BytesIO_input(self): if not compat.PY3: raise nose.SkipTest( "Bytes-related test - only needs to work on Python 3") result = pd.read_fwf(BytesIO("????\n????".encode('utf8')), widths=[ 2, 2], encoding='utf8') expected = pd.DataFrame([["??", "??"]], columns=["??", "??"]) tm.assert_frame_equal(result, expected) data = BytesIO("????::1234\n562::123".encode('cp1255')) result = pd.read_table(data, sep="::", engine='python', encoding='cp1255') expected = pd.DataFrame([[562, 123]], columns=["????", "1234"]) tm.assert_frame_equal(result, expected)
def test_full_file(self): # File with all values test = '''index A B C 2000-01-03T00:00:00 0.980268513777 3 foo 2000-01-04T00:00:00 1.04791624281 -4 bar 2000-01-05T00:00:00 0.498580885705 73 baz 2000-01-06T00:00:00 1.12020151869 1 foo 2000-01-07T00:00:00 0.487094399463 0 bar 2000-01-10T00:00:00 0.836648671666 2 baz 2000-01-11T00:00:00 0.157160753327 34 foo''' colspecs = ((0, 19), (21, 35), (38, 40), (42, 45)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test)))
def test_full_file_with_missing(self): # File with missing values test = '''index A B C 2000-01-03T00:00:00 0.980268513777 3 foo 2000-01-04T00:00:00 1.04791624281 -4 bar 0.498580885705 73 baz 2000-01-06T00:00:00 1.12020151869 1 foo 2000-01-07T00:00:00 0 bar 2000-01-10T00:00:00 0.836648671666 2 baz 34''' colspecs = ((0, 19), (21, 35), (38, 40), (42, 45)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test)))
def test_full_file_with_spaces_and_missing(self): # File with spaces and missing values in columsn test = ''' Account Name Balance CreditLimit AccountCreated 101 10000.00 1/17/1998 312 Gerard Butler 90.00 1000.00 8/6/2003 868 5/25/1985 761 Jada Pinkett-Smith 49654.87 100000.00 12/5/2006 317 Bill Murray 789.65 '''.strip('\r\n') colspecs = ((0, 7), (8, 28), (30, 38), (42, 53), (56, 70)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test)))
def test_messed_up_data(self): # Completely messed up file test = ''' Account Name Balance Credit Limit Account Created 101 10000.00 1/17/1998 312 Gerard Butler 90.00 1000.00 761 Jada Pinkett-Smith 49654.87 100000.00 12/5/2006 317 Bill Murray 789.65 '''.strip('\r\n') colspecs = ((2, 10), (15, 33), (37, 45), (49, 61), (64, 79)) expected = read_fwf(StringIO(test), colspecs=colspecs) tm.assert_frame_equal(expected, read_fwf(StringIO(test)))
def test_multiple_delimiters(self): test = r''' col1~~~~~col2 col3++++++++++++++++++col4 ~~22.....11.0+++foo~~~~~~~~~~Keanu Reeves 33+++122.33\\\bar.........Gerard Butler ++44~~~~12.01 baz~~Jennifer Love Hewitt ~~55 11+++foo++++Jada Pinkett-Smith ..66++++++.03~~~bar Bill Murray '''.strip('\r\n') colspecs = ((0, 4), (7, 13), (15, 19), (21, 41)) expected = read_fwf(StringIO(test), colspecs=colspecs, delimiter=' +~.\\') tm.assert_frame_equal(expected, read_fwf(StringIO(test), delimiter=' +~.\\'))
def test_variable_width_unicode(self): if not compat.PY3: raise nose.SkipTest( 'Bytes-related test - only needs to work on Python 3') test = ''' ???? ???? ?? ??? ?? ?? '''.strip('\r\n') expected = pd.read_fwf(BytesIO(test.encode('utf8')), colspecs=[(0, 4), (5, 9)], header=None, encoding='utf8') tm.assert_frame_equal(expected, read_fwf(BytesIO(test.encode('utf8')), header=None, encoding='utf8'))
def test_bool_header_arg(self): # GH 6114 data = """\ MyColumn a b a b""" for arg in [True, False]: with tm.assertRaises(TypeError): pd.read_csv(StringIO(data), header=arg) with tm.assertRaises(TypeError): pd.read_table(StringIO(data), header=arg) with tm.assertRaises(TypeError): pd.read_fwf(StringIO(data), header=arg)
def ReadFixedWidth(self, filename, **options): """Reads a fixed width ASCII file. filename: string filename returns: DataFrame """ df = pandas.read_fwf(filename, colspecs=self.colspecs, names=self.names, **options) return df
def parseVoyagerData(self, spacecraft, in_filename): ''' Parse Voyager Data @param spacecraft: Voyager spacecraft (vy1 or vy2) @param in_filename: Input voyager data filename @return Pandas Dataframe of Voyager data ''' def convert_date(year, day, hour): ''' Convert to datetime @param year: Input year @param day: Input day @param hour: Input hour @return datetime ''' return pd.to_datetime("{0:0>4}{1:0>3}{2:0>2}".format(year,day,hour), format='%Y%j%H') # Voyager 1 has 3 less columns than Voyager 2 if spacecraft == 'voyager1': field_widths = self.field_widths[:34] field_names = self.field_names[:34] else: field_widths = self.field_widths field_names = self.field_names # Parse the data data = pd.read_fwf(in_filename, widths=field_widths, header=None, names=field_names) # Create date column data['Date'] = list(map(convert_date, data.loc[:,'Year'], data.loc[:,'Day'], data.loc[:,'Hour'])) data.set_index('Date', inplace=True) return data
def parse_momatrix(self): dim = int(self[5]) ndim = dim * dim found = self.find(_re_orb, _re_occ, _re_ens, keys_only=True) skips = found[_re_orb] start = skips[0] occs = [i + 1 for i in found[_re_occ]] ens = [i + 1 for i in found[_re_ens]] if not found[_re_ens]: ens = False ncol = len(self[start + 1].split()) cols = 4 if ncol == 1 else ncol chnk = np.ceil(dim / cols).astype(np.int64) orbdx = np.repeat(range(dim), chnk) if len(occs) == 2: skips.insert(dim, skips[dim] - 1) orbdx = np.concatenate([orbdx, orbdx]) skips = [i - skips[0] for i in skips] if ncol == 1: coefs = pd.read_fwf(StringIO('\n'.join(self[start:occs[0]-2])), skiprows=skips, header=None, widths=[18]*4) if ens: ens = self._one_el(ens, chnk, ncol) else: coefs = self.pandas_dataframe(start, occs[0]-2, ncol, **{'skiprows': skips}) if ens: echnk = np.ceil(dim / len(self[ens[0] + 1].split())).astype(np.int64) ens = self._one_el(ens, echnk, ncol) occs = self._one_el(occs, chnk, ncol) coefs['idx'] = orbdx coefs = coefs.groupby('idx').apply(pd.DataFrame.stack).drop( 'idx', level=2).values mo = {'orbital': np.repeat(range(dim), dim), 'frame': 0, 'chi': np.tile(range(dim), dim)} if ens: orb = {'frame': 0, 'group': 0} if len(occs) == 2: mo['coef'] = coefs[:len(coefs)//2] mo['coef1'] = coefs[len(coefs)//2:] self.occupation_vector = {'coef': occs[0], 'coef1': occs[1]} if ens: orb['occupation'] = np.concatenate(occs) orb['energy'] = np.concatenate(ens) orb['vector'] = np.concatenate([range(dim), range(dim)]) orb['spin'] = np.concatenate([np.zeros(dim), np.ones(dim)]) else: mo['coef'] = coefs self.occupation_vector = occs[0] if ens: orb['occupation'] = occs[0] orb['energy'] = ens[0] orb['vector'] = range(dim) orb['spin'] = np.zeros(dim) self.momatrix = pd.DataFrame.from_dict(mo) if ens: self.orbital = pd.DataFrame.from_dict(orb)
def parse_basis_set(self): """ Parses the primitive exponents, coefficients and shell if BSSHOW specified in SEWARD. """ found = self.find(_re_bas_0, _re_bas_1, _re_bas_2, keys_only=True) bmaps = [i + 1 for i in found[_re_bas_0]] atoms = [i + 2 for i in found[_re_bas_1]] alphs = [i + 1 for i in found[_re_bas_2]] widths = [11, 7, 8, 11, 10, 12] names = _re_bas_0.split() setmap, basmap = {}, [] for seht, (start, atst) in enumerate(zip(bmaps, atoms)): stop = start while self[stop].strip(): stop += 1 while self[atst].strip(): setmap[self[atst].split()[0]] = seht atst += 1 basmap.append(pd.read_fwf(StringIO('\n'.join(self[start:stop])), widths=widths, header=None, names=names)) basmap[-1]['set'] = seht self.atom['set'] = self.atom['tag'].map(setmap) basmap = pd.concat(basmap).reset_index(drop=True) basmap['Shell'] = basmap['Shell'].map(lmap) prims, pset, shell = [], 0, 0 for start, seht, L, nprim, nbas in zip(alphs, basmap['set'], basmap['Shell'], basmap['nPrim'], basmap['nBasis']): if pset != seht: shell = 0 # In case contraction coefficients overflow to next line neat = len(self[start].split()) == len(self[start + 1].split()) if neat: block = self.pandas_dataframe(start, start + nprim, nbas + 2) else: stop = start + 2 * nprim most = self[start:stop:2] extr = self[start + 1:stop:2] ncols = len(most[0].split()) + len(extr[0].split()) block = pd.read_csv(StringIO('\n'.join([i + j for i, j in zip(most, extr)])), delim_whitespace=True, names=range(ncols)) alps = (pd.concat([block[1]] * nbas).reset_index(drop=True) .str.replace('D', 'E').astype(np.float64)) ds = block[list(range(2, nbas + 2))].unstack().reset_index(drop=True) pdf = pd.concat([alps, ds], axis=1) pdf.columns = ['alpha', 'd'] pdf['L'] = L pdf['shell'] = np.repeat(range(shell, shell + nbas), nprim) pdf['set'] = seht prims.append(pdf) shell += nbas pset = seht prims = pd.concat(prims).reset_index(drop=True) prims['frame'] = 0 self.basis_set = prims
def get_tracerinfo(tracerinfo_file): """ Read an output's tracerinfo.dat file and parse into a DataFrame for use in selecting and parsing categories. Parameters ---------- tracerinfo_file : str Path to tracerinfo.dat Returns ------- DataFrame containing the tracer information. """ widths = [rec.width for rec in tracer_recs] col_names = [rec.name for rec in tracer_recs] dtypes = [rec.type for rec in tracer_recs] usecols = [name for name in col_names if name != '-'] tracer_df = pd.read_fwf(tracerinfo_file, widths=widths, names=col_names, dtypes=dtypes, comment="#", header=None, usecols=usecols) tracer_desc = {tracer.name: tracer.desc for tracer in tracer_recs if tracer.name != '-'} # Process some of the information about which variables are hydrocarbons # and chemical tracers versus other diagnostics. def _assign_hydrocarbon(row): if row['C'] != 1: row['hydrocarbon'] = True row['molwt'] = C_MOLECULAR_WEIGHT else: row['hydrocarbon'] = False return row tracer_df = ( tracer_df .apply(_assign_hydrocarbon, axis=1) .assign(chemical=lambda x: x['molwt'].astype(bool)) ) return tracer_df, tracer_desc
def _mag_ness_fromascii(probe, year, doy, try_download=True): """ Read in a single day of 6 second magnetic field data. Data is read from orignal ascii files, and saved to a hdf file for faster access after the first read. Parameters ---------- probe : int, string Helios probe to import data from. Must be 1 or 2. year : int Year doy : int Day of year Returns ------- data : DataFrame 6 second magnetic field data set """ probe = _check_probe(probe) local_dir = _ness_localdir(probe, year) remote_url = ('ftp://spdf.sci.gsfc.nasa.gov/pub/data/helios/helios' + probe + '/mag/6sec_ness/' + str(year) + '/') fname = _ness_fname(probe, year, doy) + '.asc' f = helper.load(fname, local_dir, remote_url, try_download=try_download) # Read in data headings = ['probe', 'year', 'doy', 'hour', 'minute', 'second', 'naverage', 'Bx', 'By', 'Bz', '|B|', 'sigma_Bx', 'sigma_By', 'sigma_Bz'] colspecs = [(1, 2), (2, 4), (4, 7), (7, 9), (9, 11), (11, 13), (13, 15), (15, 22), (22, 29), (29, 36), (36, 42), (42, 48), (48, 54), (54, 60)] data = pd.read_fwf(f, names=headings, header=None, colspecs=colspecs) # Process data data['year'] += 1900 # Convert date info to datetime data['Time'] = pd.to_datetime(data['year'], format='%Y') + \ pd.to_timedelta(data['doy'] - 1, unit='d') + \ pd.to_timedelta(data['hour'], unit='h') + \ pd.to_timedelta(data['minute'], unit='m') + \ pd.to_timedelta(data['second'], unit='s') data = data.drop(['year', 'doy', 'hour', 'minute', 'second'], axis=1) data = data.set_index('Time', drop=False) # Save data to a hdf store if use_hdf: _save_hdf(data, local_dir, _ness_fname(probe, year, doy)) return(data)
def main(): ''' Parse a fixed-width rate4site output file into a CSV. ''' parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description='Extract rate values from raw rate4site output and write rates to a CSV.', epilog=textwrap.dedent('''\ This script produces a CSV with the following columns: Column name Description =================================================================== fasta_position (Defined in Rate4Site file as POS column) Site number, extracted from the alignment FASTA file fasta_aa (Defined in Rate4Site file as SCORE column) The amino acid in the reference sequence in one letter code. r4s_rate (Defined in Rate4Site file as SCORE column) The conservation scores. lower value = higher conservation. ''')) parser.add_argument('rates', metavar='<r4s_rates>', type=str, help='rate file output from rate4site') parser.add_argument('-o', metavar='<output file>', type=str, help='name of output file') args = parser.parse_args() if args.o is None: outfile = 'extracted_' + \ os.path.splitext(os.path.basename(args.rates))[0] + '.csv' else: outfile = args.o # Import r4s output as dataframe rates = pd.read_fwf(args.rates, skiprows=13, # Skip r4s header junk skipfooter=2, # Skip mean and std dev footer widths=[5, 5, 9], # Specifiy column widths usecols=[0,1,2], # Grab the first 4 columns names=['fasta_position', 'fasta_aa', 'r4s_rate']) # Write dataframe to file rates.to_csv(outfile, index=False)
def test_fwf(self): data_expected = """\ 2011,58,360.242940,149.910199,11950.7 2011,59,444.953632,166.985655,11788.4 2011,60,364.136849,183.628767,11806.2 2011,61,413.836124,184.375703,11916.8 2011,62,502.953953,173.237159,12468.3 """ expected = self.read_csv(StringIO(data_expected), header=None) data1 = """\ 201158 360.242940 149.910199 11950.7 201159 444.953632 166.985655 11788.4 201160 364.136849 183.628767 11806.2 201161 413.836124 184.375703 11916.8 201162 502.953953 173.237159 12468.3 """ colspecs = [(0, 4), (4, 8), (8, 20), (21, 33), (34, 43)] df = read_fwf(StringIO(data1), colspecs=colspecs, header=None) tm.assert_frame_equal(df, expected) data2 = """\ 2011 58 360.242940 149.910199 11950.7 2011 59 444.953632 166.985655 11788.4 2011 60 364.136849 183.628767 11806.2 2011 61 413.836124 184.375703 11916.8 2011 62 502.953953 173.237159 12468.3 """ df = read_fwf(StringIO(data2), widths=[5, 5, 13, 13, 7], header=None) tm.assert_frame_equal(df, expected) # From Thomas Kluyver: apparently some non-space filler characters can # be seen, this is supported by specifying the 'delimiter' character: # http://publib.boulder.ibm.com/infocenter/dmndhelp/v6r1mx/index.jsp?topic=/com.ibm.wbit.612.help.config.doc/topics/rfixwidth.html data3 = """\ 201158~~~~360.242940~~~149.910199~~~11950.7 201159~~~~444.953632~~~166.985655~~~11788.4 201160~~~~364.136849~~~183.628767~~~11806.2 201161~~~~413.836124~~~184.375703~~~11916.8 201162~~~~502.953953~~~173.237159~~~12468.3 """ df = read_fwf( StringIO(data3), colspecs=colspecs, delimiter='~', header=None) tm.assert_frame_equal(df, expected) with tm.assertRaisesRegexp(ValueError, "must specify only one of"): read_fwf(StringIO(data3), colspecs=colspecs, widths=[6, 10, 10, 7]) with tm.assertRaisesRegexp(ValueError, "Must specify either"): read_fwf(StringIO(data3), colspecs=None, widths=None)