Python numpy 模块,genfromtxt() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.genfromtxt()。
def _get_data_dims(self, input_fname):
"""Briefly scan the data file for info"""
# raw data formatting is nsamps by nchans + counter
data = np.genfromtxt(input_fname, delimiter=',', comments='%',
skip_footer=1)
diff = np.abs(np.diff(data[:, 0]))
diff = np.mod(diff, 254) - 1
missing_idx = np.where(diff != 0)[0]
missing_samps = diff[missing_idx].astype(int)
nsamps, nchan = data.shape
# add the missing samples
nsamps += sum(missing_samps)
# remove the tracker column
nchan -= 1
del data
return nsamps, nchan
def get_adjacency_matrix(out_dir, sid, expt_id):
"Returns the adjacency matrix"
vec_path = pjoin(out_dir, sid, '{}_graynet.csv'.format(expt_id))
edge_vec = np.genfromtxt(vec_path)
matrix_size = np.int64( (1.0 + np.sqrt(1.0+8.0*len(edge_vec)))/2.0 )
edge_mat = np.zeros([matrix_size, matrix_size])
# making this symmetric as required by nilearn's plot_connectome (stupid)
# upper tri; diag +1; # lower tri; diag -1
upper_tri = np.triu_indices_from(edge_mat, +1)
lower_tri = np.tril_indices_from(edge_mat, -1)
edge_mat[upper_tri] = edge_vec
edge_mat[lower_tri] = edge_mat.T[lower_tri]
return edge_mat
def _block2df(block,obstypes,svnames,svnum):
"""
input: block of text corresponding to one time increment INTERVAL of RINEX file
output: 2-D array of float64 data from block. Future: consider whether best to use Numpy, Pandas, or Xray.
"""
nobs = len(obstypes)
stride=3
strio = BytesIO(block.encode())
barr = np.genfromtxt(strio, delimiter=(14,1,1)*5).reshape((svnum,-1), order='C')
data = barr[:,0:nobs*stride:stride]
lli = barr[:,1:nobs*stride:stride]
ssi = barr[:,2:nobs*stride:stride]
data = np.vstack(([data.T],[lli.T],[ssi.T])).T
return data
def _block2df(block,obstypes,svnames,svnum):
"""
input: block of text corresponding to one time increment INTERVAL of RINEX file
output: 2-D array of float64 data from block. Future: consider whether best to use Numpy, Pandas, or Xray.
"""
nobs = len(obstypes)
stride=3
strio = BytesIO(block.encode())
barr = np.genfromtxt(strio, delimiter=(14,1,1)*5).reshape((svnum,-1), order='C')
data = barr[:,0:nobs*stride:stride]
lli = barr[:,1:nobs*stride:stride]
ssi = barr[:,2:nobs*stride:stride]
data = np.vstack(([data.T],[lli.T],[ssi.T])).T
return data
def _block2df(block,obstypes,svnames,svnum):
"""
input: block of text corresponding to one time increment INTERVAL of RINEX file
output: 2-D array of float64 data from block.
"""
nobs = len(obstypes)
stride=3
strio = BytesIO(block.encode())
barr = np.genfromtxt(strio, delimiter=(14,1,1)*5).reshape((svnum,-1), order='C')
data = barr[:,0:nobs*stride:stride]
lli = barr[:,1:nobs*stride:stride]
ssi = barr[:,2:nobs*stride:stride]
data = np.vstack(([data],[lli],[ssi])).T #4D numpy array
return data
def _read_horizons_file(self):
"""
reads standard output from JPL Horizons into self.data_lists
"""
# Read in the file
self._get_start_end()
data = np.genfromtxt(
self.file_properties['file_name'],
dtype=[('date', 'S17'), ('ra_dec', 'S23'), ('distance', 'f8'),
('foo', 'S23')],
delimiter=[18, 29, 18, 24], autostrip=True,
skip_header=self.file_properties['start_ind'] + 1,
skip_footer=(self.file_properties['line_count'] -
self.file_properties['stop_ind']))
# Fix time format
for (i, date) in enumerate(data['date']):
data['date'][i] = Utils.date_change(date)
self.data_lists = data
def test_skip_footer_with_invalid(self):
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
basestr = '1 1\n2 2\n3 3\n4 4\n5 \n6 \n7 \n'
# Footer too small to get rid of all invalid values
assert_raises(ValueError, np.genfromtxt,
TextIO(basestr), skip_footer=1)
# except ValueError:
# pass
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
a = np.genfromtxt(TextIO(basestr), skip_footer=3)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
basestr = '1 1\n2 \n3 3\n4 4\n5 \n6 6\n7 7\n'
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]]))
a = np.genfromtxt(
TextIO(basestr), skip_footer=3, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]]))
def test_commented_header(self):
# Check that names can be retrieved even if the line is commented out.
data = TextIO("""
#gender age weight
M 21 72.100000
F 35 58.330000
M 33 21.99
""")
# The # is part of the first name and should be deleted automatically.
test = np.genfromtxt(data, names=True, dtype=None)
ctrl = np.array([('M', 21, 72.1), ('F', 35, 58.33), ('M', 33, 21.99)],
dtype=[('gender', '|S1'), ('age', int), ('weight', float)])
assert_equal(test, ctrl)
# Ditto, but we should get rid of the first element
data = TextIO(b"""
# gender age weight
M 21 72.100000
F 35 58.330000
M 33 21.99
""")
test = np.genfromtxt(data, names=True, dtype=None)
assert_equal(test, ctrl)
def test_dtype_with_object(self):
# Test using an explicit dtype with an object
data = """ 1; 2001-01-01
2; 2002-01-31 """
ndtype = [('idx', int), ('code', np.object)]
func = lambda s: strptime(s.strip(), "%Y-%m-%d")
converters = {1: func}
test = np.genfromtxt(TextIO(data), delimiter=";", dtype=ndtype,
converters=converters)
control = np.array(
[(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))],
dtype=ndtype)
assert_equal(test, control)
ndtype = [('nest', [('idx', int), ('code', np.object)])]
try:
test = np.genfromtxt(TextIO(data), delimiter=";",
dtype=ndtype, converters=converters)
except NotImplementedError:
pass
else:
errmsg = "Nested dtype involving objects should be supported."
raise AssertionError(errmsg)
def test_replace_space(self):
# Test the 'replace_space' option
txt = "A.A, B (B), C:C\n1, 2, 3.14"
# Test default: replace ' ' by '_' and delete non-alphanum chars
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None)
ctrl_dtype = [("AA", int), ("B_B", int), ("CC", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
# Test: no replace, no delete
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None,
replace_space='', deletechars='')
ctrl_dtype = [("A.A", int), ("B (B)", int), ("C:C", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
# Test: no delete (spaces are replaced by _)
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None,
deletechars='')
ctrl_dtype = [("A.A", int), ("B_(B)", int), ("C:C", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
def test_names_with_usecols_bug1636(self):
# Make sure we pick up the right names w/ usecols
data = "A,B,C,D,E\n0,1,2,3,4\n0,1,2,3,4\n0,1,2,3,4"
ctrl_names = ("A", "C", "E")
test = np.genfromtxt(TextIO(data),
dtype=(int, int, int), delimiter=",",
usecols=(0, 2, 4), names=True)
assert_equal(test.dtype.names, ctrl_names)
#
test = np.genfromtxt(TextIO(data),
dtype=(int, int, int), delimiter=",",
usecols=("A", "C", "E"), names=True)
assert_equal(test.dtype.names, ctrl_names)
#
test = np.genfromtxt(TextIO(data),
dtype=int, delimiter=",",
usecols=("A", "C", "E"), names=True)
assert_equal(test.dtype.names, ctrl_names)
def test_gft_using_filename(self):
# Test that we can load data from a filename as well as a file
# object
tgt = np.arange(6).reshape((2, 3))
if sys.version_info[0] >= 3:
# python 3k is known to fail for '\r'
linesep = ('\n', '\r\n')
else:
linesep = ('\n', '\r\n', '\r')
for sep in linesep:
data = '0 1 2' + sep + '3 4 5'
with temppath() as name:
with open(name, 'w') as f:
f.write(data)
res = np.genfromtxt(name)
assert_array_equal(res, tgt)
def get_image_data_and_labels(index_file, get_full_path=True, as_list=True):
if not os.path.exists(index_file):
print 'Error, no index file at path ', index_file
return [],[]
index_file_dir = os.path.dirname(index_file)
data = np.genfromtxt(index_file, dtype='str')
labels = data[:,1].astype(int)
if as_list:
im_data= list(data[:,0])
else:
im_data = data[:,0]
if get_full_path:
im_data_f = [join(index_file_dir,im) for im in im_data ]
if not as_list:
im_data_f = np.array(im_data_f)
else:
im_data_f = im_data
return im_data_f,labels
def main():
drawXtremIOCharts()
# data = np.genfromtxt('xtremPerfStats.csv', dtype=float, delimiter=',', names=True)
# print data.dtype.names
# iops = plot.figure()
# iopsInit = len(iops.axes)
# bw = plot.figure()
# bwInit = len(bw.axes)
# latency = plot.figure()
# latencyInit = len(latency.axes)
# xCpu = plot.figure()
# xCpuInit = len(xCpu.axes)
# for name in data.dtype.names:
# if re.search('iops', name):
# drawPlots(data,iops,name,"IOPs",iopsInit+1)
# if re.search('bandwidth', name):
# drawPlots(data,bw,name,"Bandwidth, MB/s", bwInit+1)
# if re.search('latency', name):
# drawPlots(data,latency,name,"Latency, MicroSec", latencyInit+1)
# if re.search('SC', name):
# drawPlots(data,xCpu,name,"% CPU Utilization", xCpuInit+1)
# plot.show()
def rave(dr=5, usecols=None):
"""
NAME:
rave
PURPOSE:
Load the RAVE data
INPUT:
dr= (5) data release
usecols= (sequence, optional) indices to read from RAVE data
OUTPUT:
data table
HISTORY:
2016-09-12 - Written - Bovy (UofT)
"""
filePath, ReadMePath= path.ravePath(dr=dr)
if not os.path.exists(filePath):
download.rave(dr=dr)
if dr == 4:
data= astropy.io.ascii.read(filePath,readme=ReadMePath)
elif dr == 5:
if usecols:
data= numpy.genfromtxt(filePath,delimiter=',', names=True, usecols=usecols)
else:
data= numpy.genfromtxt(filePath,delimiter=',', names=True)
return data
def __init__(self, **kwargs):
logging.info('Crossword __init__: Initializing crossword...')
logging.debug('kwargs:', kwargs)
# Reading kwargs
self.setup = kwargs
self.rows = int(kwargs.get('n', 5))
self.cols = int(kwargs.get('m', 5))
self.words_file = str(kwargs.get('word_file', 'lemma.num.txt'))
self.sort = bool(kwargs.get('sort', False))
self.maximize_len = bool(kwargs.get('maximize_len', False))
self.repeat_words = bool(kwargs.get('repeat_words', False))
logging.debug('Crossword __init__: n={}, m={}, fname={}'.format(self.rows, self.cols, self.words_file))
# Loading words
logging.debug('Crossword __init__: Started loading words from {}'.format(self.words_file))
arr = np.genfromtxt(self.words_file, dtype='str', delimiter=' ')
self.words = arr[np.in1d(arr[:, 3], ['v', 'n', 'adv', 'a'])][:, 2].tolist()
# Number of words loaded
logging.debug('Crossword __init__: Number of words loaded: {}'.format(len(self.words)))
self.words = list(set(x for x in self.words if len(x) <= self.rows and len(x) <= self.cols))
if self.sort:
self.words = sorted(self.words, key=len, reverse=self.maximize_len)
# After filter logging
logging.debug('Crossword __init__: Number of words after filter: {}, maxlen = {}'.format(len(self.words), len(
max(self.words, key=len))))
def test_gd():
'''
A gradient descent and linear regression example to solve y = mx + b equation
using gradient descent, m is slope, b is y-intercept
by Matt Nedrich
Source: http://spin.atomicobject.com/2014/06/24/gradient-descent-linear-regression/
'''
# read data
points = genfromtxt("data/spring.csv", delimiter=",")
# initial y-intercept guess
b0 = 0
# initial slope guess
m0 = 0
# number of iterations to perform the GD
n_iter = 1000
for i in range(n_iter):
# perform GD iterations
b0, m0 = step_gradient(b0, m0, points, 0.0001)
print("GD\ti=%d\tb=%f\tm=%f\te=%f\t(y=%f*x+%f)" %
(n_iter, b0, m0, compute_error(b0, m0, points), m0, b0))
def create_LOFAR_configuration(antfile: str, meta: dict = None) -> Configuration:
""" Define from the LOFAR configuration file
:param antfile:
:param meta:
:return: Configuration
"""
antxyz = numpy.genfromtxt(antfile, skip_header=2, usecols=[1, 2, 3], delimiter=",")
nants = antxyz.shape[0]
assert antxyz.shape[1] == 3, "Antenna array has wrong shape %s" % antxyz.shape
anames = numpy.genfromtxt(antfile, dtype='str', skip_header=2, usecols=[0], delimiter=",")
mounts = numpy.repeat('XY', nants)
location = EarthLocation(x=[3826923.9] * u.m, y=[460915.1] * u.m, z=[5064643.2] * u.m)
fc = Configuration(location=location, names=anames, mount=mounts, xyz=antxyz, frame='global',
diameter=35.0)
return fc
def getCytoRNADataFromCsv(dataPath, batchesPath, batch1, batch2, trainPct = 0.8):
data = genfromtxt(dataPath, delimiter=',', skip_header=0)
batches = genfromtxt(batchesPath, delimiter=',', skip_header=0)
source = data[batches == batch1]
target = data[batches == batch2]
n_source = source.shape[0]
p = np.random.permutation(n_source)
cutPt = int(n_source * trainPct)
source_train = source[p[:cutPt]]
source_test = source[p[cutPt:]]
n_target = target.shape[0]
p = np.random.permutation(n_target)
cutPt = int(n_target * trainPct)
target_train = target[p[:cutPt]]
target_test = target[p[cutPt:]]
return source_train, source_test, target_train, target_test
def getCytoRNADataFromCsv(dataPath, batchesPath, batch1, batch2, trainPct = 0.8):
data = genfromtxt(dataPath, delimiter=',', skip_header=0)
batches = genfromtxt(batchesPath, delimiter=',', skip_header=0)
source = data[batches == batch1]
target = data[batches == batch2]
n_source = source.shape[0]
p = np.random.permutation(n_source)
cutPt = int(n_source * trainPct)
source_train = source[p[:cutPt]]
source_test = source[p[cutPt:]]
n_target = target.shape[0]
p = np.random.permutation(n_target)
cutPt = int(n_target * trainPct)
target_train = target[p[:cutPt]]
target_test = target[p[cutPt:]]
return source_train, source_test, target_train, target_test
def test_l1l2path():
X_file = 'data_c/X_200_100.csv'
Y_file = 'data_c/Y_200_100.csv'
X = np.genfromtxt(X_file)
Y = np.genfromtxt(Y_file)
mu = 1e-3
tau_range = np.logspace(-2,0,3)
k_max = 10000
tolerance = 1e-4
pc = pplus.PPlusConnection(debug=False, workers_servers = ('127.0.0.1',))
pc.submit(l1l2path_job,
args=(X, Y, mu, tau_range, k_max, tolerance),
modules=('numpy as np', 'ctypes'))
result_keys = pc.collect()
print result_keys
print("Done")
def __init__(self,Hs,d,slope):
Hs = float(Hs)
d = float(d)
slope = float(slope)
battjes = genfromtxt("battjes.csv",delimiter=',') #import table with normalized wave heights from Battjes&Groenendijk 2000, Wave height distribution on shallow foreshores
if Hs/d >= 0.78:
self.Hs = 0.78*d
else:
self.Htr = (0.35+5.8*1/slope)*d
# Hrms equation .59 The Rock Manual (page 359)
self.Hrms = (0.6725 + 0.2025*(Hs/d))*Hs
# calculate the normalised Htr
HtrNorm = self.Htr / self.Hrms
#find nearest to self.Htr in column 1 of Battjes. Choose the value immediately next to it.
index = int(HtrNorm / 0.05) + 1
if index > 60:
index = 60
#extract the relevant wave heights from Battjes table.
self.Hs = battjes[index,3] * self.Hrms
self.H2Percent = battjes[index,5] * self.Hrms
self.H1Percent = battjes[index,6] * self.Hrms
self.Hmax = battjes[index,7] * self.Hrms
def getCalibMatrix(dataPath, frameNum):
# load calibration data
# P0, P1, P2, P3, Tr_velo_to_cam, Tr_imu_to_velo
pathCalib = 'calib/{:0>6}.txt'.format(frameNum)
P_left = np.genfromtxt(pathCalib,dtype=None,usecols=range(1,13),skip_header=2,skip_footer=4).reshape(3,4) # 4x4
rect_3x3 = np.genfromtxt(pathCalib,dtype=None,usecols=range(1,10),skip_header=4,skip_footer=2).reshape(3,3) # 3x3
velo2cam_3x4 = np.genfromtxt(pathCalib,dtype=None,usecols=range(1,13),skip_header=5,skip_footer=1).reshape(3,4) # 4x4
rect = np.eye(4)
velo2cam = np.eye(4)
rect[:3,:3] =rect_3x3
velo2cam[:3, :3] = velo2cam_3x4[:3,:3]
velo2cam[:3, 3] = velo2cam_3x4[:3, 3]
return {'P_left':P_left,'rect':rect,'velo2cam':velo2cam}
def getCalibMatrix(dataPath, frameNum):
# load calibration data
# P0, P1, P2, P3, Tr_velo_to_cam, Tr_imu_to_velo
pathCalib = dataPath+'calib/{:0>6}.txt'.format(frameNum)
P_left = np.genfromtxt(pathCalib,dtype=None,usecols=range(1,13),skip_header=2,skip_footer=4).reshape(3,4) # 4x4
rect_3x3 = np.genfromtxt(pathCalib,dtype=None,usecols=range(1,10),skip_header=4,skip_footer=2).reshape(3,3) # 3x3
velo2cam_3x4 = np.genfromtxt(pathCalib,dtype=None,usecols=range(1,13),skip_header=5,skip_footer=1).reshape(3,4) # 4x4
rect = np.eye(4)
velo2cam = np.eye(4)
rect[:3,:3] =rect_3x3
velo2cam[:3, :3] = velo2cam_3x4[:3,:3]
velo2cam[:3, 3] = velo2cam_3x4[:3, 3]
return {'P_left':P_left,'rect':rect,'velo2cam':velo2cam}
def run(self, args, extra_args):
if args.output_file is not None:
output_file = os.path.realpath(args.output_file)
else:
output_file = os.path.realpath(args.input_protocol)
additional_files = []
if args.additional_files:
for file in args.additional_files:
additional_files.append(np.genfromtxt(file))
protocol = mdt.load_protocol(os.path.realpath(args.input_protocol))
context_dict = {name: protocol.get_column(name) for name in protocol.column_names}
exec(args.expr, {'np': np, 'files': additional_files}, context_dict)
for key in context_dict:
if is_scalar(context_dict[key]):
context_dict[key] = np.ones(protocol.length) * context_dict[key]
protocol = Protocol(context_dict)
mdt.write_protocol(protocol, output_file)
def with_added_column_from_file(self, name, file_name, multiplication_factor=1):
"""Create a copy of this protocol with the given column (loaded from a file) added to this protocol.
The given file can either contain a single value or one value per protocol line.
Args:
name (str): The name of the column to add.
file_name (str): The file to get the column from.
multiplication_factor (double): we might need to scale the data by a constant. For example,
if the data in the file is in ms we might need to scale it to seconds by multiplying with 1e-3
Returns:
self: for chaining
"""
columns = copy.copy(self._columns)
if name == 'g':
columns.update(get_g_columns(file_name))
for column_name in ('gx', 'gy', 'gz'):
columns[column_name] *= multiplication_factor
return Protocol(columns)
else:
data = np.genfromtxt(file_name)
data *= multiplication_factor
return self.with_new_column(name, data)
def load_edges(fpath, delimiter=None, has_header=False):
"""Load edges in CSV format as numpy ndarray of strings.
Args:
fpath (str): edges file
delimiter (str): alternative argument name for sep (default=None)
has_header (bool): True if has header row
Returns:
np.ndarray: array of edges
"""
if PANDAS_INSTALLED:
header = 'infer' if has_header else None
df = pd.read_csv(fpath, delimiter=delimiter, header=header)
edges = df.values
else:
logger.warning("Pandas not installed. Using numpy to load csv, which "
"is slower.")
header = 1 if has_header else 0
edges = np.genfromtxt(fpath, delimiter=delimiter, skip_header=header,
dtype=object)
return edges.astype('str')
def read_xyt_frame( n=1 ):
''' Load the xyt txt files:
x,y is the detector (x,y) coordinates
t is the time-encoder (when hitting the detector at that (x,y))
DATA_DIR is the data filefold path
DataPref is the data prefix
n is file number
the data name will be like: DATA_DIR/DataPref_0001.txt
return the histogram of the hitting event
'''
import numpy as np
ni = '%04d'%n
fp = DATA_DIR + DataPref + '%s.txt'%ni
data = np.genfromtxt( fp, skiprows=0)[:,2] #take the time encoder
td = np.histogram( data, bins= np.arange(11810) )[0] #do histogram
return td
def read_xyt_frame( n=1 ):
''' Load the xyt txt files:
x,y is the detector (x,y) coordinates
t is the time-encoder (when hitting the detector at that (x,y))
DATA_DIR is the data filefold path
DataPref is the data prefix
n is file number
the data name will be like: DATA_DIR/DataPref_0001.txt
return the histogram of the hitting event
'''
import numpy as np
ni = '%04d'%n
fp = DATA_DIR + DataPref + '%s.txt'%ni
data = np.genfromtxt( fp, skiprows=0)[:,2] #take the time encoder
td = np.histogram( data, bins= np.arange(11810) )[0] #do histogram
return td
def read_xyt_frame( n=1 ):
''' Load the xyt txt files:
x,y is the detector (x,y) coordinates
t is the time-encoder (when hitting the detector at that (x,y))
DATA_DIR is the data filefold path
DataPref is the data prefix
n is file number
the data name will be like: DATA_DIR/DataPref_0001.txt
return the histogram of the hitting event
'''
import numpy as np
ni = '%04d'%n
fp = DATA_DIR + DataPref + '%s.txt'%ni
data = np.genfromtxt( fp, skiprows=0)[:,2] #take the time encoder
td = np.histogram( data, bins= np.arange(11810) )[0] #do histogram
return td
def check_subjects(subjects_info):
"Ensure subjects are provided and their data exist."
if isinstance(subjects_info, str):
if not pexists(subjects_info):
raise IOError('path to subject list does not exist: {}'.format(subjects_info))
subjects_list = np.genfromtxt(subjects_info, dtype=str)
elif isinstance(subjects_info, collections.Iterable):
if len(subjects_info) < 1:
raise ValueError('Empty subject list.')
subjects_list = subjects_info
else:
raise ValueError('Invalid value provided for subject list. \n '
'Must be a list of paths, or path to file containing list of paths, one for each subject.')
subject_id_list = np.atleast_1d(subjects_list)
num_subjects = subject_id_list.size
if num_subjects < 1:
raise ValueError('Input subject list is empty.')
num_digits_id_size = len(str(num_subjects))
max_id_width = max(map(len, subject_id_list))
return subject_id_list, num_subjects, max_id_width, num_digits_id_size
def write_preprocessed_data(output_directory, cell_IDs, cell_stages, data, markers):
processed_data_path = path.join(output_directory, 'processed_data.tsv')
with open(processed_data_path, 'w') as f:
f.write('\t'.join(cell_IDs))
f.write('\n')
f.write('\t'.join(cell_stages))
f.write('\n')
np.savetxt(f, data.T, fmt = '%.6f', delimiter = '\t')
dataset = np.genfromtxt(processed_data_path, delimiter = '\t', dtype = str)
dataset = np.insert(dataset, 0, np.append(['Cell ID', 'Stage'],
markers), axis = 1)
with open(processed_data_path, 'w') as f:
np.savetxt(f, dataset, fmt = '%s', delimiter = '\t')
def test_skip_footer_with_invalid(self):
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
basestr = '1 1\n2 2\n3 3\n4 4\n5 \n6 \n7 \n'
# Footer too small to get rid of all invalid values
assert_raises(ValueError, np.genfromtxt,
TextIO(basestr), skip_footer=1)
# except ValueError:
# pass
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
a = np.genfromtxt(TextIO(basestr), skip_footer=3)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
basestr = '1 1\n2 \n3 3\n4 4\n5 \n6 6\n7 7\n'
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]]))
a = np.genfromtxt(
TextIO(basestr), skip_footer=3, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]]))
def test_commented_header(self):
# Check that names can be retrieved even if the line is commented out.
data = TextIO("""
#gender age weight
M 21 72.100000
F 35 58.330000
M 33 21.99
""")
# The # is part of the first name and should be deleted automatically.
test = np.genfromtxt(data, names=True, dtype=None)
ctrl = np.array([('M', 21, 72.1), ('F', 35, 58.33), ('M', 33, 21.99)],
dtype=[('gender', '|S1'), ('age', int), ('weight', float)])
assert_equal(test, ctrl)
# Ditto, but we should get rid of the first element
data = TextIO(b"""
# gender age weight
M 21 72.100000
F 35 58.330000
M 33 21.99
""")
test = np.genfromtxt(data, names=True, dtype=None)
assert_equal(test, ctrl)
def test_dtype_with_object(self):
# Test using an explicit dtype with an object
data = """ 1; 2001-01-01
2; 2002-01-31 """
ndtype = [('idx', int), ('code', np.object)]
func = lambda s: strptime(s.strip(), "%Y-%m-%d")
converters = {1: func}
test = np.genfromtxt(TextIO(data), delimiter=";", dtype=ndtype,
converters=converters)
control = np.array(
[(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))],
dtype=ndtype)
assert_equal(test, control)
ndtype = [('nest', [('idx', int), ('code', np.object)])]
try:
test = np.genfromtxt(TextIO(data), delimiter=";",
dtype=ndtype, converters=converters)
except NotImplementedError:
pass
else:
errmsg = "Nested dtype involving objects should be supported."
raise AssertionError(errmsg)
def test_replace_space(self):
# Test the 'replace_space' option
txt = "A.A, B (B), C:C\n1, 2, 3.14"
# Test default: replace ' ' by '_' and delete non-alphanum chars
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None)
ctrl_dtype = [("AA", int), ("B_B", int), ("CC", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
# Test: no replace, no delete
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None,
replace_space='', deletechars='')
ctrl_dtype = [("A.A", int), ("B (B)", int), ("C:C", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
# Test: no delete (spaces are replaced by _)
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None,
deletechars='')
ctrl_dtype = [("A.A", int), ("B_(B)", int), ("C:C", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
def test_names_with_usecols_bug1636(self):
# Make sure we pick up the right names w/ usecols
data = "A,B,C,D,E\n0,1,2,3,4\n0,1,2,3,4\n0,1,2,3,4"
ctrl_names = ("A", "C", "E")
test = np.genfromtxt(TextIO(data),
dtype=(int, int, int), delimiter=",",
usecols=(0, 2, 4), names=True)
assert_equal(test.dtype.names, ctrl_names)
#
test = np.genfromtxt(TextIO(data),
dtype=(int, int, int), delimiter=",",
usecols=("A", "C", "E"), names=True)
assert_equal(test.dtype.names, ctrl_names)
#
test = np.genfromtxt(TextIO(data),
dtype=int, delimiter=",",
usecols=("A", "C", "E"), names=True)
assert_equal(test.dtype.names, ctrl_names)
def test_gft_using_filename(self):
# Test that we can load data from a filename as well as a file
# object
tgt = np.arange(6).reshape((2, 3))
if sys.version_info[0] >= 3:
# python 3k is known to fail for '\r'
linesep = ('\n', '\r\n')
else:
linesep = ('\n', '\r\n', '\r')
for sep in linesep:
data = '0 1 2' + sep + '3 4 5'
with temppath() as name:
with open(name, 'w') as f:
f.write(data)
res = np.genfromtxt(name)
assert_array_equal(res, tgt)
def read_file_to_np(self, file_name):
datatype = [('time',np.float32), ('ax',np.int16), ('ay',np.int16), ('az',np.int16),
('gx',np.int16), ('gy',np.int16), ('gz',np.int16),
('mx',np.int16), ('my',np.int16), ('mz',np.int16),
('time_diff', np.float32)]
data = np.genfromtxt(file_name, dtype=datatype, delimiter="\t")
data['time'] = data['time']-data['time'][0]
a = np.diff(data['time'])
time_diff_array = np.insert(a, 0, 0)
data['time_diff'] = time_diff_array
# ?????
data['mx'] = data['mx'] * 1.18359375
data['my'] = data['my'] * 1.19140625
data['mz'] = data['mz'] * 1.14453125
return data
def get_overlapping_files(self, path, ra, dec, width):
"""
This function ...
:param path to the directory with the images
:param ra:
:param dec:
:param width:
:return:
"""
# Generate the meta and then overlap file
meta_path, overlap_path = self.generate_meta_and_overlap_file(path, ra, dec, width)
# Load the overlap table
overlap_files = np.genfromtxt(overlap_path, skip_header=3, usecols=[32], dtype="S500")
# Return the names of the overlapping images
return overlap_files
# -----------------------------------------------------------------
def get_overlapping_files(self, path, ra, dec, width):
"""
This function ...
:param path to the directory with the images
:param ra:
:param dec:
:param width:
:return:
"""
# Generate the meta and then overlap file
meta_path, overlap_path = self.generate_meta_and_overlap_file(path, ra, dec, width)
# Load the overlap table
overlap_files = np.genfromtxt(overlap_path, skip_header=3, usecols=[32], dtype="S500")
# Return the names of the overlapping images
return overlap_files
# -----------------------------------------------------------------
def read_gpl(self):
dtype = [('waves',float),]+[('spec%i'%(i+1),float) for i in range(len(self.age))]
self.sed = np.genfromtxt(self.workdir+self.csp_output+'.spec',dtype=dtype)
age3, Q = np.genfromtxt(self.workdir+self.csp_output+'.3color', usecols=(0,5), unpack=True)
age4, M = np.genfromtxt(self.workdir+self.csp_output+'.4color', usecols=(0,6), unpack=True)
for x,age in zip(self.sed.dtype.names[1:],self.age):
self.sed[x] = self.sed[x] * 3.839e33
self.sed[x][self.sed["waves"] < 912.] = self.sed[x][self.sed["waves"] < 912.] * self.lyc_esc
log_age = np.log10(age*1e9)
diff = abs(age3 - log_age)
self.Q[x] = Q[diff == min(diff)][0]
diff = abs(age4 - log_age)
self.M_unnorm[x] = M[diff == min(diff)][0]
def main(opts):
vertices = np.genfromtxt('points.dat', delimiter=' ', skip_header=1)
npoints, dim = vertices.shape
assert dim == 3
faces = np.genfromtxt('indices.dat', delimiter=' ') # Generated from alpha_shape
# Create the mesh
cube = mesh.Mesh(np.zeros(faces.shape[0], dtype=mesh.Mesh.dtype))
for i, f in enumerate(faces):
for j in range(3):
cube.vectors[i][j] = vertices[f[j],:]
# Write the mesh to file
cube.save(opts.new_file_name)
def read_array(filename):
''' Read array and convert to 2d np arrays '''
array = np.genfromtxt(filename, dtype=float)
if len(array.shape)==1:
array = array.reshape( -1, 1 )
return array
def file_to_array (filename, verbose=False):
''' Converts a file to a list of list of STRING
It differs from np.genfromtxt in that the number of columns doesn't need to be constant'''
data =[]
with open(filename, "r") as data_file:
if verbose: print ("Reading {}...".format(filename))
lines = data_file.readlines()
if verbose: print ("Converting {} to correct array...".format(filename))
data = [lines[i].strip().split() for i in range (len(lines))]
del lines #djajetic 11.11.2015 questionable
return data
def load_iris():
try:
# Load Iris dataset from the sklearn.datasets package
from sklearn import datasets
from sklearn import decomposition
# Load Dataset
iris = datasets.load_iris()
X = iris.data
y = iris.target
labels = iris.target_names
# Reduce components by Principal Component Analysis from sklearn
X = decomposition.PCA(n_components=3).fit_transform(X)
except ImportError:
# Load Iris dataset manually
path = os.path.join('data', 'iris', 'iris.data')
iris_data = np.genfromtxt(path, dtype='str', delimiter=',')
X = iris_data[:, :4].astype(dtype=float)
y = np.ndarray((X.shape[0],), dtype=int)
# Create target vector y and corresponding labels
labels, idx = [], 0
for i, label in enumerate(iris_data[:, 4]):
label = label.split('-')[1]
if label not in labels:
labels.append(label); idx += 1
y[i] = idx - 1
# Reduce components by implemented Principal Component Analysis
X = PCA(X, 3)[0]
return X, y, labels
def read_model_table(modelfile):
'''
This reads a downloaded TRILEGAL model file.
'''
infd = gzip.open(modelfile)
model = np.genfromtxt(infd,names=True)
infd.close()
return model
def test_stats2():
"""Test stats2 func from fluxpart.util"""
data = "7 8 4\n6 1 3\n10 6 6\n6 7 3\n8 2 4"
dtype = [('v0', int), ('v1', int), ('v2', int)]
arr = np.genfromtxt(io.BytesIO(data.encode()), dtype=dtype)
ans = stats2(arr)
npt.assert_allclose(ans.ave_v0, 37 / 5)
npt.assert_allclose(ans.ave_v1, 24 / 5)
npt.assert_allclose(ans.ave_v2, 4)
npt.assert_allclose(ans.var_v0, 14 / 5)
npt.assert_allclose(ans.var_v1, 97 / 10)
npt.assert_allclose(ans.var_v2, 3 / 2)
npt.assert_allclose(ans.cov_v0_v1, 3 / 5)
npt.assert_allclose(ans.cov_v0_v2, 2)
npt.assert_allclose(ans.cov_v1_v0, ans.cov_v0_v1)
npt.assert_allclose(ans.cov_v1_v2, 1)
npt.assert_allclose(ans.cov_v2_v0, ans.cov_v0_v2)
npt.assert_allclose(ans.cov_v2_v1, ans.cov_v1_v2)
data = "7 8 4\n6 1 3\n10 6 6\n6 7 3\n8 2 4"
dtype = [('v0', int), ('v1', int), ('v2', int)]
arr = np.genfromtxt(io.BytesIO(data.encode()), dtype=dtype)
ans = stats2(arr, names=('v0', 'v2'))
npt.assert_allclose(ans.ave_v0, 37 / 5)
npt.assert_allclose(ans.ave_v2, 4)
npt.assert_allclose(ans.var_v0, 14 / 5)
npt.assert_allclose(ans.var_v2, 3 / 2)
npt.assert_allclose(ans.cov_v0_v2, 2)
npt.assert_allclose(ans.cov_v2_v0, ans.cov_v0_v2)
assert not hasattr(ans, 'ave_v1')
assert not hasattr(ans, 'var_v1')
assert not hasattr(ans, 'cov_v0_v1')
assert not hasattr(ans, 'cov_v1_v0')
assert not hasattr(ans, 'cov_v1_v2')
assert not hasattr(ans, 'cov_v2_v1')
def merge_results(sol,files):
model = get_model_type(sol)
save_where = '/Batch results/'
working_path = getcwd().replace("\\", "/")+"/"
save_path = working_path+save_where
print("\nChecking for longest csv file")
lengths = []
for f in files:
to_merge_temp = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f)
headers_temp = np.genfromtxt(to_merge_temp, delimiter=",", dtype=str, skip_footer=1)
lengths.append(len(headers_temp))
to_merge_max = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(files[lengths.index(max(lengths))],sol.model,model,files[lengths.index(max(lengths))])
headers = np.genfromtxt(to_merge_max, delimiter=",", dtype=str, skip_footer=1)
print("\nMerging csv files")
if not path.exists(save_path):
makedirs(save_path)
# to_merge = working_path+"/Results/%s/INV_%s_%s.csv" %(files[0],model,files[0])
# headers = np.genfromtxt(to_merge, delimiter=",", dtype=str, skip_footer=1)
merged_inv_results = np.zeros((len(files), len(headers)))
merged_inv_results.fill(np.nan)
for i, f in enumerate(files):
to_add = np.loadtxt(working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f), delimiter=",", skiprows=1)
merged_inv_results[i][:to_add.shape[0]] = to_add
rows = np.array(files, dtype=str)[:, np.newaxis]
hd = ",".join(["ID"] + list(headers))
np.savetxt(save_path+"Merged_%s-%s_%s_TO_%s.csv" %(sol.model,model,files[0],files[-1]), np.hstack((rows, merged_inv_results)), delimiter=",", header=hd, fmt="%s")
print("Batch file successfully saved in:\n", save_path)
def merge_results(sol,files):
model = get_model_type(sol)
save_where = '/Batch results/'
working_path = getcwd().replace("\\", "/")+"/"
save_path = working_path+save_where
print("\nChecking for longest csv file")
lengths = []
for f in files:
to_merge_temp = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f)
headers_temp = np.genfromtxt(to_merge_temp, delimiter=",", dtype=str, skip_footer=1)
lengths.append(len(headers_temp))
to_merge_max = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(files[lengths.index(max(lengths))],sol.model,model,files[lengths.index(max(lengths))])
headers = np.genfromtxt(to_merge_max, delimiter=",", dtype=str, skip_footer=1)
print("\nMerging csv files")
if not path.exists(save_path):
makedirs(save_path)
# to_merge = working_path+"/Results/%s/INV_%s_%s.csv" %(files[0],model,files[0])
# headers = np.genfromtxt(to_merge, delimiter=",", dtype=str, skip_footer=1)
merged_inv_results = np.zeros((len(files), len(headers)))
merged_inv_results.fill(np.nan)
for i, f in enumerate(files):
to_add = np.loadtxt(working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f), delimiter=",", skiprows=1)
merged_inv_results[i][:to_add.shape[0]] = to_add
rows = np.array(files, dtype=str)[:, np.newaxis]
hd = ",".join(["ID"] + list(headers))
np.savetxt(save_path+"Merged_%s-%s_%s_TO_%s.csv" %(sol.model,model,files[0],files[-1]), np.hstack((rows, merged_inv_results)), delimiter=",", header=hd, fmt="%s")
print("Batch file successfully saved in:\n", save_path)