Python fileinput 模块,filename() 实例源码
我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用fileinput.filename()。
def next(self):
if self.index + self.batch_size < len(self.sequence_info):
batch_info = self.sequence_info[self.index:self.index + self.batch_size]
filenames = [os.path.join(PoseNetInputProvider.BASE_DIR, filename) for filename in batch_info[:, 0]]
rgb_files = map(read_rgb_image, filenames)
mean = get_mean(rgb_files)
self.logger.info('Mean for current batch:{}'.format(mean))
rgb_files = [rgb_file - mean for rgb_file in rgb_files]
groundtruths = batch_info[:, 1:]
batch = PoseNetInputProvider.PoseNetBatch()
batch.rgb_files = rgb_files
batch.groundtruths = groundtruths
batch.rgb_filenames = filenames
self.index += self.batch_size
return batch
else:
raise StopIteration()
def test_zero_byte_files(self):
t1 = t2 = t3 = t4 = None
try:
t1 = writeTmp(1, [""])
t2 = writeTmp(2, [""])
t3 = writeTmp(3, ["The only line there is.\n"])
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))
line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)
line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
def test_zero_byte_files(self):
t1 = t2 = t3 = t4 = None
try:
t1 = writeTmp(1, [""])
t2 = writeTmp(2, [""])
t3 = writeTmp(3, ["The only line there is.\n"])
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))
line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)
line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
def test_zero_byte_files(self):
t1 = t2 = t3 = t4 = None
try:
t1 = writeTmp(1, [""])
t2 = writeTmp(2, [""])
t3 = writeTmp(3, ["The only line there is.\n"])
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))
line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)
line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
def get_args_parser():
parser = argparse.ArgumentParser(add_help=False)
parser.usage = """TEST_FILE [TOT_ERRORS] [-i] [-x=VERSION]"""
parser.add_argument('filename', metavar='TEST_FILE', type=str, help="Test filename (relative path).")
parser.add_argument('tot_errors', nargs='?', type=int, default=0, help="Total errors expected (default=0).")
parser.add_argument(
'-i', dest="inspect", action="store_true", default=False,
help="inspect using an observed custom schema class."
)
parser.add_argument(
"-v", dest="version", metavar='VERSION', type=xsd_version_number, default='1.0',
help="XSD version to use for schema (default is 1.0)."
)
return parser
def tests_factory(test_function_builder, pathname, label="validation", suffix="xml"):
tests = {}
test_num = 0
for line in fileinput.input(glob.iglob(pathname)):
line = line.strip()
if not line or line[0] == '#':
continue
test_args = test_line_parser.parse_args(get_test_args(line))
test_file = os.path.join(os.path.dirname(fileinput.filename()), test_args.filename)
if not os.path.isfile(test_file) or os.path.splitext(test_file)[1].lower() != '.%s' % suffix:
continue
if test_args.inspect:
schema_class = ObservedXMLSchema
else:
schema_class = xmlschema.XMLSchema
test_func = test_function_builder(test_file, schema_class, test_args.tot_errors, test_args.inspect)
test_name = os.path.join(os.path.dirname(sys.argv[0]), os.path.relpath(test_file))
test_num += 1
class_name = 'Test{0}{1:03}'.format(label.title(), test_num)
tests[class_name] = type(
class_name, (XMLSchemaTestCase,),
{'test_{0}_{1:03}_{2}'.format(label, test_num, test_name): test_func}
)
return tests
def t02():
"""??????????"""
for line in fileinput.input('data.txt'):
print(fileinput.filename(), '|', 'Line Number:',
fileinput.lineno(), '|: ', line)
def t06():
"""??fileinput?????"""
for line in fileinput.input(glob.glob("d*.txt"), inplace=1):
if fileinput.isfirstline():
print('-' * 20, 'Reading %s...' % fileinput.filename(), '-' * 20)
print(str(fileinput.lineno()) + ': ' + line.rstrip().upper())
def t08():
"""??fileinput?????grep???"""
import re
pattern = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
for line in fileinput.input('data.txt', backup='.bak', inplace=1):
if pattern.search(line):
print(fileinput.filename(), fileinput.filelineno(), line)
def __init__(self, filename):
self.filename = filename
self.message = self.__str__()
def __str__(self):
msg = QtGui.QApplication.translate("Exceptions", "No feature was created. The shapefile was deleted {}.\n", None, QtGui.QApplication.UnicodeUTF8)
return msg.format(self.filename)
def __init__(self, filename, msg):
self.filename = filename
self.message = self.__str__() + msg
def __str__(self):
msg = QtGui.QApplication.translate("Exceptions", "Error reading {}.\n", None, QtGui.QApplication.UnicodeUTF8)
return msg.format(self.filename)
def __init__(self):
self.config_provider = get_config_provider()
training_filenames = self.config_provider.training_filenames()
self.training_filenames = [os.path.join(self.BASE_DATA_DIR, filename) for filename in training_filenames]
self.logger = get_logger()
self.batch_size = len(self.training_filenames)
def get_training_batch(self):
self.logger.info('Going to create batch of images and ground truths')
images_batch = []
groundtruth_batch = []
sequence_length = 300
for filename in self.training_filenames:
self.logger.info('Creating input queue for training sample at:{}'.format(filename))
associations = np.loadtxt(os.path.join(filename, "associate.txt"), dtype="str", unpack=False)
groundtruth = np.loadtxt(os.path.join(filename, "groundtruth.txt"), dtype="str", unpack=False)
twist, rgb_filepaths, depth_filepaths = self._get_data(associations, groundtruth, sequence_length)
rgb_filepaths = [os.path.join(filename, filepath) for filepath in rgb_filepaths]
depth_filepaths = [os.path.join(filename, filepath) for filepath in depth_filepaths]
rgb_filepaths_tensor = tf.convert_to_tensor(rgb_filepaths)
depth_filepaths_tensor = tf.convert_to_tensor(depth_filepaths)
input_queue = tf.train.slice_input_producer([rgb_filepaths_tensor, depth_filepaths_tensor, twist], shuffle=False)
image, outparams = self.read_rgbd_data(input_queue)
images_batch.append(image)
groundtruth_batch.append(outparams)
# images, outparam_batch = tf.train.batch([image, outparams], batch_size=self.batch_size,
# num_threads=20, capacity=4 * self.batch_size)
return images_batch, groundtruth_batch
def __init__(self, filename_provider):
training_filenames = filename_provider()
self.sequence_dirs = [os.path.join(self.BASE_DATA_DIR, filename) for filename in training_filenames]
self.seq_dir_map = {}
for seq_dir in self.sequence_dirs:
associations = np.loadtxt(os.path.join(seq_dir, "associate.txt"), dtype="str", unpack=False)
groundtruth = np.loadtxt(os.path.join(seq_dir , "groundtruth.txt"), dtype="str", unpack=False)
if seq_dir not in self.seq_dir_map:
self.seq_dir_map[seq_dir] = {}
self.seq_dir_map[seq_dir]['associations'] = associations
self.seq_dir_map[seq_dir]['groundtruth'] = groundtruth
sequence_size = associations.shape[0]
twist = np.zeros((sequence_size, 6))
trans_old = np.zeros((4, 4))
for i in range(sequence_size):
quat = groundtruth[:, 1:][_find_label(groundtruth[:, 0], associations[i, 0])].astype(np.float32)
trans_new = _quat_to_transformation(quat)
if i > 0:
twist[i] = _trans_to_twist(trans_new)
else:
twist[i] = np.zeros(6)
trans_old = trans_new
self.seq_dir_map[seq_dir]['relpos'] = twist
def filename(self):
self.invocation_counts["filename"] += 1
return self.return_values["filename"]
def test_state_is_None(self):
"""Tests fileinput.filename() when fileinput._state is None.
Ensure that it raises RuntimeError with a meaningful error message
and does not modify fileinput._state"""
fileinput._state = None
with self.assertRaises(RuntimeError) as cm:
fileinput.filename()
self.assertEqual(("no active input()",), cm.exception.args)
self.assertIsNone(fileinput._state)
def test_state_is_not_None(self):
"""Tests fileinput.filename() when fileinput._state is not None.
Ensure that it invokes fileinput._state.filename() exactly once,
returns whatever it returns, and does not modify fileinput._state
to point to a different object."""
filename_retval = object()
instance = MockFileInput()
instance.return_values["filename"] = filename_retval
fileinput._state = instance
retval = fileinput.filename()
self.assertExactlyOneInvocation(instance, "filename")
self.assertIs(retval, filename_retval)
self.assertIs(fileinput._state, instance)
def do_test_use_builtin_open(self, filename, mode):
original_open = self.replace_builtin_open(self.fake_open)
try:
result = fileinput.hook_compressed(filename, mode)
finally:
self.replace_builtin_open(original_open)
self.assertEqual(self.fake_open.invocation_count, 1)
self.assertEqual(self.fake_open.last_invocation,
((filename, mode), {}))
def load_data(filenames):
""" ???????????
:param filenames: ??????
:return: Bunch ????. See:
http://scikit-learn.org/stable/datasets/index.html#datasets
"""
# ??????
data = []
# ???????????
target = []
# ????
target_names = {}
# ????????????HUM,???????
data_re = re.compile(r'(\w+),(.+)')
for line in fileinput.input(filenames):
match = data_re.match(line.decode('utf-8'))
if not match:
raise Exception("Invalid format in dataset {} at line {}"
.format(fileinput.filename(),
fileinput.filelineno()))
label, text = match.group(1), match.group(2)
if label not in target_names:
target_names[label] = len(target_names)
# ??????????`HUM`, `LOC`, etc.?
target.append(label)
# ?????????????{'HUM': 1, 'LOC': 2}?
# target.append(target_names[label])
data.append(text)
return Bunch(
data=numpy.array(data),
target=numpy.array(target),
target_names=numpy.array([k for k in target_names]),
)
def filename(self):
self.invocation_counts["filename"] += 1
return self.return_values["filename"]
def test_state_is_None(self):
"""Tests fileinput.filename() when fileinput._state is None.
Ensure that it raises RuntimeError with a meaningful error message
and does not modify fileinput._state"""
fileinput._state = None
with self.assertRaises(RuntimeError) as cm:
fileinput.filename()
self.assertEqual(("no active input()",), cm.exception.args)
self.assertIsNone(fileinput._state)
def test_state_is_not_None(self):
"""Tests fileinput.filename() when fileinput._state is not None.
Ensure that it invokes fileinput._state.filename() exactly once,
returns whatever it returns, and does not modify fileinput._state
to point to a different object."""
filename_retval = object()
instance = MockFileInput()
instance.return_values["filename"] = filename_retval
fileinput._state = instance
retval = fileinput.filename()
self.assertExactlyOneInvocation(instance, "filename")
self.assertIs(retval, filename_retval)
self.assertIs(fileinput._state, instance)
def do_test_use_builtin_open(self, filename, mode):
original_open = self.replace_builtin_open(self.fake_open)
try:
result = fileinput.hook_compressed(filename, mode)
finally:
self.replace_builtin_open(original_open)
self.assertEqual(self.fake_open.invocation_count, 1)
self.assertEqual(self.fake_open.last_invocation,
((filename, mode), {}))
def write_file(filename, data):
f = open(filename, 'w')
print "#"*50
print f
print "the filename is =>"+filename
print "#"*50
try:
f.write(data)
except Exception,e:
elog.exception(e)
finally:
f.close()
def delFile(filename):
if os.path.exists(filename) and os.path.isfile(filename):
os.remove(filename)
def filename(self):
self.invocation_counts["filename"] += 1
return self.return_values["filename"]
def test_state_is_None(self):
"""Tests fileinput.filename() when fileinput._state is None.
Ensure that it raises RuntimeError with a meaningful error message
and does not modify fileinput._state"""
fileinput._state = None
with self.assertRaises(RuntimeError) as cm:
fileinput.filename()
self.assertEqual(("no active input()",), cm.exception.args)
self.assertIsNone(fileinput._state)
def test_state_is_not_None(self):
"""Tests fileinput.filename() when fileinput._state is not None.
Ensure that it invokes fileinput._state.filename() exactly once,
returns whatever it returns, and does not modify fileinput._state
to point to a different object."""
filename_retval = object()
instance = MockFileInput()
instance.return_values["filename"] = filename_retval
fileinput._state = instance
retval = fileinput.filename()
self.assertExactlyOneInvocation(instance, "filename")
self.assertIs(retval, filename_retval)
self.assertIs(fileinput._state, instance)
def do_test_use_builtin_open(self, filename, mode):
original_open = self.replace_builtin_open(self.fake_open)
try:
result = fileinput.hook_compressed(filename, mode)
finally:
self.replace_builtin_open(original_open)
self.assertEqual(self.fake_open.invocation_count, 1)
self.assertEqual(self.fake_open.last_invocation,
((filename, mode), {}))
def __init__(self, filename):
self._filename = filename
def __init__(self,filename,numTerminalTags):
self.__filename = filename
(self.__processedSentenceTimeTuples,self.__processedSentenceOriginalSentenceMap) = self.__returnProcessedTuples(numTerminalTags)
#utils.print_ds(self.__processedSentenceTimeTuples)
def __init__(self,filename):
self.__filename = filename
def countFileLines(path, fileTypes, ignoreDirs=[]):
"""???????????????
Args:
path: ??
fileTypes: ???????????['.py', '.c', '.java']
ignoreDirs: ???????????['.git']
showDetails: ???????????????????
Returns:
count: ???
details: {filename: count}????????
"""
count = 0
details = {}
for root, dirs, filenames in os.walk(path):
# print root # ????
# print dirs # ????????
# print filenames # ????????
ignoreFlag = False
for eachDir in ignoreDirs:
if root.find(eachDir) >= 0:
ignoreFlag = True
break
if ignoreFlag:
continue
for fn in filenames:
typeFlag = False
for ft in fileTypes:
if fn.endswith(ft):
typeFlag = True
break
if not typeFlag:
continue
f = os.path.join(root, fn)
for each in fileinput.input(f):
# print each #???????
pass
details[fileinput.filename()] = fileinput.lineno()
count += fileinput.lineno()
return count, details