Python os.path 模块,isfile() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.path.isfile()。
def test_uu_encoding(self):
"""
Test the encoding of data; this is nessisary prior to a post
"""
# First we take a binary file
binary_filepath = join(self.var_dir, 'joystick.jpg')
assert isfile(binary_filepath)
# Initialize Codec
encoder = CodecUU(work_dir=self.test_dir)
content = encoder.encode(binary_filepath)
# We should have gotten an ASCII Content Object
assert isinstance(content, NNTPAsciiContent) is True
# We should actually have content associated with out data
assert len(content) > 0
def test_hexdump(self):
"""converts binary content to hexidecimal in a standard
easy to read format
"""
# Compare File
hexdump_file = join(self.var_dir, 'hexdump.txt')
assert isfile(hexdump_file)
all_characters = ''.join(map(chr, range(0, 256)))
with open(hexdump_file, 'r') as fd_in:
ref_data = fd_in.read()
# when reading in content, there is a new line appeneded
# after the last line (even if one isn't otherwise present)
# rstrip() to just simplify the test by stripping off
# all trailing whitespace
assert hexdump(all_characters) == ref_data.rstrip()
def session(self, reset=False):
"""
Returns a database session
"""
if not self._loaded:
return False
if not isfile(self.db_path):
reset = True
if self._db and reset is True:
self._db = None
if self._db is None:
# Reset our database
self._db = NNTPGetDatabase(engine=self.engine, reset=reset)
# Acquire our session
return self._db.session()
def session(self, reset=False):
"""
Returns a database session
"""
if not self._loaded:
return False
if not isfile(self.db_path):
reset = True
if self._db and reset is True:
self._db = None
if self._db is None:
# Reset our database
self._db = NNTPPostDatabase(engine=self.engine, reset=reset)
# Acquire our session
return self._db.session()
def database_reset(ctx):
"""
Reset's the database based on the current configuration
"""
logger.info('Resetting database ...')
ctx['NNTPSettings'].open(reset=True)
__db_prep(ctx)
db_path = join(ctx['NNTPSettings'].base_dir, 'cache', 'search')
logger.debug('Scanning %s for databases...' % db_path)
with pushd(db_path, create_if_missing=True):
for entry in listdir(db_path):
db_file = join(db_path, entry)
if not isfile(db_file):
continue
try:
unlink(db_file)
logger.info('Removed %s ...' % entry)
except:
logger.warning('Failed to remove %s ...' % entry)
def dirsize(src):
"""
Takes a source directory and returns the entire size of all of it's
content(s) in bytes.
The function returns None if the size can't be properly calculated.
"""
if not isdir(src):
# Nothing to return
return 0
try:
with pushd(src, create_if_missing=False):
size = sum(getsize(f) for f in listdir('.') if isfile(f))
except (OSError, IOError):
return None
# Return our total size
return size
def enumerate_backups_entities():
"""Enumerates the entities of all the available backups"""
if isdir(Backuper.backups_dir):
# Look for subdirectories
for directory in listdir(Backuper.backups_dir):
entity_file = path.join(Backuper.backups_dir, directory, 'entity.tlo')
# Ensure the entity.pickle file exists
if isfile(entity_file):
# Load and yield it
with open(entity_file, 'rb') as file:
with BinaryReader(stream=file) as reader:
try:
yield reader.tgread_object()
except TypeNotFoundError:
# Old user, scheme got updated, don't care.
pass
#endregion
#region Backup exists and deletion
def backup_propic(self):
"""Backups the profile picture for the given
entity as the current peer profile picture, returning its path"""
# Allow multiple versions of the profile picture
# TODO Maybe this should be another method, because when downloading media... We also have multiple versions
filename = self.media_handler.get_propic_path(self.entity, allow_multiple=True)
generic_filename = self.media_handler.get_propic_path(self.entity)
if filename: # User may not have a profile picture
if not isfile(filename):
# Only download the file if it doesn't exist yet
self.client.download_profile_photo(self.entity.photo,
file_path=filename,
add_extension=False)
# If we downloaded a new version, copy it to the "default" generic file
if isfile(generic_filename):
remove(generic_filename)
shutil.copy(filename, generic_filename)
# The user may not have a profile picture
return generic_filename
def storageindex(self):
#get the filelist
onlyfiles = [ f for f in listdir(self.indexdata) if isfile(join(self.indexdata,f)) ]
#read from using pandas
for f in onlyfiles:
df = pd.read_csv(self.indexdata+"/"+f)
s=f.split('.')
name = s[0][2:8]
records = json.loads(df.T.to_json()).values()
for row in records:
row['date'] = datetime.datetime.strptime(row['date'], "%Y-%m-%d")
print name
self.index[name].insert_many(records)
#storage stock pool into database
def find_rain(src, paths=[]):
if src[0] == '/':
paths = ['']
elif src[0] != '.':
paths = get_paths() + paths
for path in paths:
if isfile(join(path, src) + '.rn'):
return join(path, src) + '.rn'
elif isfile(join(path, src)) and src.endswith('.rn'):
return join(path, src)
elif isdir(join(path, src)) and isfile(join(path, src, '_pkg.rn')):
return join(path, src, '_pkg.rn')
# find any file from a string
def find_name(src):
path = os.path.abspath(src)
path, name = os.path.split(path)
fname, ext = os.path.splitext(name)
if fname == '_pkg':
_, fname = os.path.split(path)
mname = normalize_name(fname)
proot = []
while path and os.path.isfile(join(path, '_pkg.rn')):
path, name = os.path.split(path)
proot.insert(0, normalize_name(name))
if not src.endswith('_pkg.rn'):
proot.append(mname)
qname = '.'.join(proot)
return (qname, mname)
def clean_epub_directory():
epubs = listdir(config.EPUB_DIRECTORY)
if len(epubs) <= config.MAX_EPUB:
return
epubs.sort()
number_to_delete = len(epubs) - config.MAX_EPUB + 2
deleted = 0
for t in epubs:
f = path.join(config.EPUB_DIRECTORY, t)
if not path.isfile(f):
continue
if deleted >= number_to_delete:
break
try:
remove(f)
deleted += 1
except OSError:
pass
def loadConfig(self, config):
configFile = config.getConfigFile()
if not isfile(configFile):
self.logger.warn("Config file %s does not exist. Using defaults." % configFile)
return config
self.logger.debug("Loading config from %s." % configFile)
loader = ConfigParser.SafeConfigParser(allow_no_value=True)
loader.add_section("main")
loader.set("main", "log_file", config.logFile)
loader.set("main", "migration_dirs", join(config.migrationDirs, ":"))
loader.set("main", "pre_migration_dirs", join(config.preMigrationDirs, ":"))
loader.set("main", "post_migration_dirs", join(config.postMigrationDirs, ":"))
loader.set("main", "state_dir", config.stateDir)
loader.set("main", "run_dir", config.runDir)
loader.read(configFile)
config.logFile = loader.get("main", "log_file")
config.migrationDirs = split(loader.get("main", "migration_dirs"), ":")
config.preMigrationDirs = split(loader.get("main", "pre_migration_dirs"), ":")
config.postMigrationDirs = split(loader.get("main", "post_migration_dirs"), ":")
config.stateDir = loader.get("main", "state_dir")
config.runDir = loader.get("main", "run_dir")
return config
def _record_filter(args, base_dir):
"""
Save the filter provided
"""
filter_file = '{}/.filter'.format(base_dir)
if not isfile(filter_file):
# do a touch filter_file
open(filter_file, 'a').close()
current_filter = {}
with open(filter_file) as filehandle:
current_filter = yaml.load(filehandle)
if current_filter is None:
current_filter = {}
pprint.pprint(current_filter)
# filter a bunch of salt content and the target key before writing
rec_args = {k: v for k, v in args.items() if k is not 'target' and not
k.startswith('__')}
current_filter[args['target']] = rec_args
with open(filter_file, 'w') as filehandle:
yaml.dump(current_filter, filehandle, default_flow_style=False)
def __init__(self, title = None, pathfile = None, debug_mode = True, debug_level = 0):
self.path_file = pathfile
self.debug_mode = debug_mode
self.starttime = time.perf_counter()
self.nowtime = time.perf_counter()
self.lastcall = time.perf_counter()
self.debug_level = debug_level
# create file?
# if not isfile(self.path_file):
# with open(self.path_file, 'w') as f:
# f.write("-init log file-")
if title is not None:
today = datetime.datetime.now()
s = title + " program started the " + today.strftime("%d of %b %Y at %H:%M")
self.log("=============================================================\n" +
s +
"\n=============================================================")
def download_model(lang, paths):
model_folder = join(paths.user_config, 'model')
model_en_folder = join(model_folder, lang)
if not isdir(model_folder):
mkdir(model_folder)
if not isdir(model_en_folder):
mkdir(model_en_folder)
file_name = paths.model_dir + '.tar.gz'
if not isfile(file_name):
import urllib.request
import shutil
url = 'https://github.com/MatthewScholefield/pocketsphinx-models/raw/master/' + lang + '.tar.gz'
with urllib.request.urlopen(url) as response, open(file_name, 'wb') as file:
shutil.copyfileobj(response, file)
import tarfile
tar = tarfile.open(file_name)
tar.extractall(path=model_en_folder)
tar.close()
def generate(self, name, data):
"""
Translate the data into different formats
Depending on the format, this can be accessed different ways
Args:
name (IntentName): full intent name
data (dict): dict containing all data from the skill
Returns:
bool: Whether the data could be generated
"""
for dir_fn in [self.rt.paths.skill_vocab, self.rt.paths.skill_formats]:
file_name = join(dir_fn(name.skill), name.intent + self._extension)
if isfile(file_name):
with open(file_name, 'r') as file:
log.debug('Generating', self.__class__.__name__ + '...')
self.generate_format(file, data)
return True
return False
def create_file(file_path):
"""
Create a file and create parent folder if missing
"""
if not (path.isfile(file_path) and path.exists(file_path)):
dirs = file_path.split("/")
i = 0
while i < len(dirs) - 1:
directory = "/".join(dirs[0:i + 1]).strip()
if not path.exists(directory) and len(directory) != 0:
makedirs(directory)
logging.debug("Creating directory %s " % directory)
i += 1
mknod(file_path)
return True
else:
return False
def screenshot_area():
"""
Screenshot an area of the screen using gnome-screenshot
used to QR scan
"""
ink_flag = call(['which', 'gnome-screenshot'], stdout=PIPE, stderr=PIPE)
if ink_flag == 0:
file_name = path.join(GLib.get_tmp_dir(), NamedTemporaryFile().name)
p = Popen(["gnome-screenshot", "-a", "-f", file_name],
stdout=PIPE, stderr=PIPE)
output, error = p.communicate()
if error:
error = error.decode("utf-8").split("\n")
logging.error("\n".join([e for e in error]))
if not path.isfile(file_name):
logging.debug("The screenshot was not token")
return False
return file_name
else:
logging.error(
"Couldn't find gnome-screenshot, please install it first")
return False
def feed(self, url_list, offset=0, max_num=0):
if isinstance(url_list, str):
if path.isfile(url_list):
with open(url_list, 'r') as fin:
url_list = [line.rstrip('\n') for line in fin]
else:
raise IOError('url list file {} not found'.format(url_list))
elif not isinstance(url_list, list):
raise TypeError('"url_list" can only be a filename or a str list')
if offset < 0 or offset >= len(url_list):
raise ValueError('"offset" exceed the list length')
else:
if max_num > 0:
end_idx = min(len(url_list), offset + max_num)
else:
end_idx = len(url_list)
for i in range(offset, end_idx):
url = url_list[i]
self.out_queue.put(url)
self.logger.debug('put url to url_queue: {}'.format(url))
def rev_parse_manifest_path(self, cwd):
"""
Search parent directories for package.json.
Starting at the current working directory. Go up one directory
at a time checking if that directory contains a package.json
file. If it does, return that directory.
"""
name = 'package.json'
manifest_path = path.normpath(path.join(cwd, name))
bin_path = path.join(cwd, 'node_modules/.bin/')
if path.isfile(manifest_path) and path.isdir(bin_path):
return manifest_path
parent = path.normpath(path.join(cwd, '../'))
if parent == '/' or parent == cwd:
return None
return self.rev_parse_manifest_path(parent)
def clear_cache(force = False):
"""
If the folder exists, and has more than 5MB of icons in the cache, delete
it to clear all the icons then recreate it.
"""
from os.path import getsize, join, isfile, exists
from os import makedirs, listdir
from sublime import cache_path
from shutil import rmtree
# The icon cache path
icon_path = join(cache_path(), "GutterColor")
# The maximum amount of space to take up
limit = 5242880 # 5 MB
if exists(icon_path):
size = sum(getsize(join(icon_path, f)) for f in listdir(icon_path) if isfile(join(icon_path, f)))
if force or (size > limit): rmtree(icon_path)
if not exists(icon_path): makedirs(icon_path)
def fix_scheme_in_settings(settings_file,current_scheme, new_scheme, regenerate=False):
"""Change the color scheme in the given Settings to a background-corrected one"""
from os.path import join, normpath, isfile
settings = load_settings(settings_file)
settings_scheme = settings.get("color_scheme")
if current_scheme == settings_scheme:
new_scheme_path = join(packages_path(), normpath(new_scheme[len("Packages/"):]))
if isfile(new_scheme_path) and not regenerate:
settings.set("color_scheme", new_scheme)
else:
generate_scheme_fix(current_scheme, new_scheme_path)
settings.set("color_scheme", new_scheme)
save_settings(settings_file)
return True
return False
def PrepareDataList(BASE, length):
List = []
for M in range(0,min(length,len(BASE))):
img, text = BASE[M]
image = misc.imread(img,mode='RGB')
#image = misc.imresize(image, [227, 227])
r1 = []
if isfile(text):
f = open(text, 'r')
s = f.readline()
st = s.split(' ')
for i in range(0,2):
r1.append(int(st[i]))
f.close()
else: #If there are no txt file - "no bird situation"
r1.append(0);
r1.append(0);
List.append([image,r1])
return List
# Random test and train list
def findAllModules(self, project_path):
modules = []
if path.isfile(path.join(project_path, 'package.json')):
packageJsonFile = open(path.join(project_path, 'package.json'))
try:
packageJson = json.load(packageJsonFile)
for key in [
"dependencies",
"devDependencies",
"peerDependencies",
"optionalDependencies"
]:
if key in packageJson:
modules += packageJson[key].keys()
except ValueError:
SimpleImport.log_error("Failed to load package.json at {0}".format(
self.project_path
))
# Close file
packageJsonFile.close()
return modules
def getProjectSettings(self):
SETTINGS_FILE = SimpleImportCommand.SETTINGS_FILE
if path.isfile(path.join(self.project_path, SETTINGS_FILE)):
with open(path.join(self.project_path, SETTINGS_FILE)) as raw_json:
try:
settings_json = json.load(raw_json)
if self.interpreter.syntax in settings_json:
settings = settings_json[self.interpreter.syntax]
if "$path" in settings:
settings_on_file = {}
for match in settings["$path"]:
if len(match) == 1:
settings_on_file.update(match[0])
else:
pattern = '|'.join(match[:-1])
if re.search("^{0}".format(pattern), self.view_relpath):
settings_on_file.update(match[-1])
return settings_on_file
else:
return settings_json[self.interpreter.syntax]
except ValueError:
print("Failed to load .simple-import.json at {0}".format(self.project_path))
def download_component(component_name):
try:
component = [component for component in components if
component["name"] == component_name][0]
try:
folder = dirname(dirname(__file__))
file_name = join(folder, join(*component["destination"]))
if isfile(file_name):
print(
"Component '{}' is already existed.".format(component["name"]))
else:
print("Start download component '{}'".format(component["name"]))
print(file_name)
download_file(component["url"], file_name)
print("Finish download compoent '{}'".format(component["name"]))
except Exception as e:
print(e)
print("Cannot download component '{}'".format(component["name"]))
except:
message = "Error: Component with name '{}' does not exist.".format(
component_name)
print(message)
def clean_list(self):
'''
Manage the list of items
to backup
'''
# Case <backup_itens> is empty
if self.backup_itens == []:
msg = "After version 0.0.4 <backup_itens> cannot be empty"
self.log.update_log(msg)
raise BaseException(msg) from None
# Add items
for item in self.backup_itens:
if path.isfile(path.abspath(item)) or path.isdir(path.abspath(item)):
self.backup_list.append(path.abspath(item))
else:
log.update_log("Invalid item. It'll be wiped from backup list: <%s>" % item, 'INFO')
def renameCB(self, newname):
if newname and newname != 'bootname' and newname != self.oldname:
if not path.exists('/boot/%s' %newname) and path.isfile('/boot/%s' %self.oldname):
ret = system("mv -fn '/boot/%s' '/boot/%s'" %(self.oldname,newname))
if ret:
self.session.open(MessageBox, _('Rename failed!'), MessageBox.TYPE_ERROR)
else:
bootname = self.readlineFile('/boot/bootname').split('=')
if len(bootname) == 2 and bootname[1] == self.oldname:
self.writeFile('/boot/bootname', '%s=%s' %(bootname[0],newname))
self.getCurrent()
return
elif self.bootname == self.oldname:
self.getCurrent()
return
self.list[self.selection] = newname
self["config"].setText(_("Select Image: %s") %newname)
else:
if not path.exists('/boot/%s' %self.oldname):
self.getCurrent()
txt = _("File not found - rename failed!")
else:
txt = _("Name already exists - rename failed!")
self.session.open(MessageBox, txt, MessageBox.TYPE_ERROR)
def get_host_file(_path=None):
"""
If ``_path`` is passed in, return that (makes it easier for the caller if
it can pass a ``None``) otherwise look into the most common locations for
a host file and if found, return that.
"""
if _path:
return _path
# if no path is passed onto us try and look in the cwd for a hosts file
if os.path.isfile('hosts'):
logger.info(
'found and loaded the hosts file from the current working directory: %s',
os.getcwd()
)
return path.abspath('hosts')
# if that is not the case, try for /etc/ansible/hosts
if path.isfile('/etc/ansible/hosts'):
return '/etc/ansible/hosts'
logger.warning('unable to find an Ansible hosts file to work with')
logger.warning('tried locations: %s, %s', os.getcwd(), '/etc/ansible/hosts')
def _generate_sparse_format_file(self, feature_file):
sparse_file = insert_modifier_in_filename(feature_file, 'sparse_format')
if path.isfile(sparse_file):
self.logger.info("Re-using previously generated sparse format file: %s" % sparse_file)
else:
self.logger.info('Generating a sparse version of the feature file (zeros replaced with empty columns '
'which the ranker knows how to deal with)')
temp_file = get_temp_file(sparse_file)
with smart_file_open(temp_file, 'w') as outfile:
writer = csv.writer(outfile)
with smart_file_open(feature_file) as infile:
reader = csv.reader(infile)
for row in reader:
writer.writerow(row[:1] + row[2:])
move(temp_file, sparse_file)
self.logger.info('Done generating file: %s' % sparse_file)
return self._get_file_size(sparse_file), sparse_file
def _drop_answer_id_col_from_feature_file(self, train_file_location):
file_without_aid = insert_modifier_in_filename(train_file_location, 'no_aid')
if path.isfile(file_without_aid):
self.logger.info('Found a previously generated version of the training file without answer id column, '
're-using it: %s' % file_without_aid)
else:
self.logger.info('Generating a version of the feature file without answer id (which is what ranker'
' training expects')
temp_file = get_temp_file(file_without_aid)
with smart_file_open(temp_file, 'w') as outfile:
writer = csv.writer(outfile)
with smart_file_open(train_file_location) as infile:
reader = csv.reader(infile)
for row in reader:
writer.writerow(row[:1] + row[2:])
move(temp_file, file_without_aid)
self.logger.info('Done generating file: %s' % file_without_aid)
return file_without_aid
def str_is_existing_path(path: str) -> str:
"""
>>> import tempfile
>>> with tempfile.TemporaryFile() as f:
... str_is_existing_file(f.name) == f.name
True
>>> with tempfile.TemporaryDirectory() as path:
... str_is_existing_path(path) == path
True
>>> str_is_existing_path('')
Traceback (most recent call last):
argparse.ArgumentTypeError: Given path is not an existing file or directory.
>>> str_is_existing_path('/non/existing/dir')
Traceback (most recent call last):
argparse.ArgumentTypeError: Given path is not an existing file or directory.
"""
if isfile(path) or isdir(path):
return path
else:
raise ArgumentTypeError("Given path is not an existing file or directory.")
def str_is_existing_file(path: str) -> str:
"""
>>> import tempfile
>>> with tempfile.TemporaryFile() as f:
... str_is_existing_file(f.name) == f.name
True
>>> str_is_existing_file('/home')
Traceback (most recent call last):
argparse.ArgumentTypeError: Given path is not an existing file.
>>> str_is_existing_file('')
Traceback (most recent call last):
argparse.ArgumentTypeError: Given path is not an existing file.
>>> str_is_existing_file('/non/existing/file')
Traceback (most recent call last):
argparse.ArgumentTypeError: Given path is not an existing file.
"""
if isfile(path):
return path
else:
raise ArgumentTypeError("Given path is not an existing file.")
def ensure_file(name, url=None, force=False, logger=logging.getLogger(), postprocess=None):
"""
Ensures that the file requested exists in the cache, downloading it if it does not exist.
Args:
name (str): name of the file.
url (str): url to download the file from, if it doesn't exist.
force (bool): whether to force the download, regardless of the existence of the file.
logger (logging.Logger): logger to log results.
postprocess (function): a function that, if given, will be applied after the file is downloaded. The function has the signature ``f(fname)``
Returns:
str: file name of the downloaded file.
"""
fname = Embedding.path(name)
if not path.isfile(fname) or force:
if url:
logger.critical('Downloading from {} to {}'.format(url, fname))
Embedding.download_file(url, fname)
if postprocess:
postprocess(fname)
else:
raise Exception('{} does not exist!'.format(fname))
return fname
def __init__(self, nb_classes, resnet_layers, input_shape, weights):
"""Instanciate a PSPNet."""
self.input_shape = input_shape
json_path = join("weights", "keras", weights + ".json")
h5_path = join("weights", "keras", weights + ".h5")
if isfile(json_path) and isfile(h5_path):
print("Keras model & weights found, loading...")
with open(json_path, 'r') as file_handle:
self.model = model_from_json(file_handle.read())
self.model.load_weights(h5_path)
else:
print("No Keras model & weights found, import from npy weights.")
self.model = layers.build_pspnet(nb_classes=nb_classes,
resnet_layers=resnet_layers,
input_shape=self.input_shape)
self.set_npy_weights(weights)
def run_makefile(make_directory):
"""
Runs a makefile in a given directory.
Args:
make_directory: directory where the Makefile is located.
"""
make_path = path.join(make_directory, "Makefile")
if not path.isfile(make_path):
raise InternalException(make_path + " does not exist.")
shell = spur.LocalShell()
try:
shell.run(["make", "-C", make_directory])
except Exception as e:
raise InternalException(str(e))
def analyze_problems():
"""
Checks the sanity of inserted problems.
Includes weightmap and grader verification.
Returns:
A list of error strings describing the problems.
"""
grader_missing_error = "{}: Missing grader at '{}'."
unknown_weightmap_pid = "{}: Has weightmap entry '{}' which does not exist."
problems = get_all_problems()
errors = []
for problem in problems:
if not isfile(join(grader_base_path, problem["grader"])):
errors.append(grader_missing_error.format(problem["name"], problem["grader"]))
for pid in problem["weightmap"].keys():
if safe_fail(get_problem, pid=pid) is None:
errors.append(unknown_weightmap_pid.format(problem["name"], pid))
return errors
def get_generator(pid):
"""
Gets a handle on a problem generator module.
Args:
pid: the problem pid
Returns:
The loaded module
"""
generator_path = get_generator_path(pid)
if not path.isfile(generator_path):
raise InternalException("Could not find {}.".format(generator_path))
return imp.load_source(generator_path[:-3], generator_path)
def getgraphsfromdir(self, path=None):
"""Get the files that are part of the repository (tracked or not).
Returns:
A list of filepathes.
"""
if path is None:
path = self.getRepoPath()
files = [f for f in listdir(path) if isfile(join(path, f))]
graphfiles = {}
for file in files:
format = guess_format(file)
if format is not None:
graphfiles[file] = format
return graphfiles
def _parse(self):
"""
parse the history file and return a list of
tuples(datetime strings, set of distributions/diffs, comments)
"""
res = []
if not isfile(self.path):
return res
sep_pat = re.compile(r'==>\s*(.+?)\s*<==')
with open(self.path, 'r') as f:
lines = f.read().splitlines()
for line in lines:
line = line.strip()
if not line:
continue
m = sep_pat.match(line)
if m:
res.append((m.group(1), set(), []))
elif line.startswith('#'):
res[-1][2].append(line)
else:
res[-1][1].add(line)
return res
def delete(self, filename=''):
"""Deletes given file or directory. If no filename is passed, current
directory is removed.
"""
self._raise_if_none()
fn = path_join(self.path, filename)
try:
if isfile(fn):
remove(fn)
else:
removedirs(fn)
except OSError as why:
if why.errno == errno.ENOENT:
pass
else:
raise why
def delete(self, filename=''):
"""Deletes given file or directory. If no filename is passed, current
directory is removed.
"""
self._raise_if_none()
fn = path_join(self.path, filename)
try:
if isfile(fn):
remove(fn)
else:
removedirs(fn)
except OSError as why:
if why.errno == errno.ENOENT:
pass
else:
raise why
def test_decoding_uuenc_single_part(self):
"""
Decodes a single UUEncoded message
"""
# Input File
encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg')
assert isfile(encoded_filepath)
# Compare File
decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg')
assert isfile(decoded_filepath)
# Initialize Codec
ud_py = CodecUU(work_dir=self.test_dir)
# Read data and decode it
with open(encoded_filepath, 'r') as fd_in:
article = ud_py.decode(fd_in)
# our content should be valid
assert isinstance(article, NNTPBinaryContent)
# Verify the actual article itself reports itself
# as being okay (structurally)
assert article.is_valid() is True
with open(decoded_filepath, 'r') as fd_in:
decoded = fd_in.read()
# Compare our processed content with the expected results
assert decoded == article.getvalue()
def test_partial_download(self):
"""
Test the handling of a download that is explicitly ordered to abort
after only some content is retrieved. A way of 'peeking' if you will.
"""
# Input File
encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg')
assert isfile(encoded_filepath)
# Compare File
decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg')
assert isfile(decoded_filepath)
# Initialize Codec (restrict content to be no larger then 10 bytes)
ud_py = CodecUU(work_dir=self.test_dir, max_bytes=10)
# Read data and decode it
with open(encoded_filepath, 'r') as fd_in:
article = ud_py.decode(fd_in)
# our content should be valid
assert isinstance(article, NNTPBinaryContent)
# Our article should not be considered valid on an
# early exit
assert article.is_valid() is False
with open(decoded_filepath, 'r') as fd_in:
decoded = fd_in.read()
# Compare our processed content with the expected results
length = len(article.getvalue())
# Even though we have't decoded all of our content, we're
# still the same as the expected result up to what has been
# processed.
assert decoded[0:length] == article.getvalue()
def test_nzbfile_generation(self):
"""
Tests the creation of NZB Files
"""
nzbfile = join(self.tmp_dir, 'test.nzbfile.nzb')
payload = join(self.var_dir, 'uudecoded.tax.jpg')
assert isfile(nzbfile) is False
# Create our NZB Object
nzbobj = NNTPnzb()
# create a fake article
segpost = NNTPSegmentedPost(basename(payload))
content = NNTPBinaryContent(payload)
article = NNTPArticle('testfile', groups='newsreap.is.awesome')
# Note that our nzb object segment tracker is not marked as being
# complete. This flag gets toggled when we add segments manually to
# our nzb object or if we parse an NZB-File
assert(nzbobj._segments_loaded is None)
# Add our Content to the article
article.add(content)
# now add our article to the NZBFile
segpost.add(article)
# now add our Segmented Post to the NZBFile
nzbobj.add(segpost)
# Since .add() was called, this will be set to True now
assert(nzbobj._segments_loaded is True)
# Store our file
assert nzbobj.save(nzbfile) is True
assert isfile(nzbfile) is True
def test_bad_files(self):
"""
Test different variations of bad file inputs
"""
# No parameters should create a file
nzbfile = join(self.var_dir, 'missing.file.nzb')
assert not isfile(nzbfile)
nzbobj = NNTPnzb(nzbfile=nzbfile)
assert nzbobj.is_valid() is False
assert nzbobj.gid() is None
# Test Length
assert len(nzbobj) == 0
def test_decoding_01(self):
"""
Open a stream to a file we can read for decoding; This test
specifically focuses on var/group.list
"""
# Initialize Codec
ch_py = CodecGroups()
encoded_filepath = join(self.var_dir, 'group.list')
assert isfile(encoded_filepath)
# Read data and decode it
with open(encoded_filepath, 'r') as fd_in:
# This module always returns 'True' expecting more
# but content can be retrieved at any time
assert ch_py.decode(fd_in) is True
# This is where the value is stored
assert isinstance(ch_py.decoded, NNTPMetaContent)
assert isinstance(ch_py.decoded.content, list)
# The number of lines in group.list parsed should all be valid
assert len(ch_py.decoded.content) == ch_py._total_lines
# Test our reset
ch_py.reset()
assert isinstance(ch_py.decoded, NNTPMetaContent)
assert isinstance(ch_py.decoded.content, list)
assert len(ch_py.decoded.content) == 0
assert len(ch_py.decoded.content) == ch_py._total_lines
def test_decoding_03(self):
"""
Open a stream to a file we can read for decoding; This test
specifically focuses on var/headers.test03.msg
"""
# Initialize Codec
ch_py = CodecHeader()
encoded_filepath = join(self.var_dir, 'headers.test03.msg')
assert isfile(encoded_filepath)
# Read data and decode it
with open(encoded_filepath, 'r') as fd_in:
# Decodes content and stops when complete
assert isinstance(ch_py.decode(fd_in), NNTPHeader)
# Read in the white space since it is actually the first line
# after the end of headers delimiter
assert fd_in.readline().strip() == 'First Line without spaces'
#print '\n'.join(["assert ch_py['%s'] == '%s'" % (k, v) \
# for k, v in ch_py.items()])
assert len(ch_py) == 10
# with the 10 lines processed, our line_count
# should be set to 10
assert ch_py._lines == 10
# assert False
assert ch_py.is_valid() == True
def test_is_valid(self):
"""
Tests different key combinations that would cause the different
return types from is_valid()
"""
# Initialize Codec
ch_py = CodecHeader()
encoded_filepath = join(self.var_dir, 'headers.test03.msg')
assert isfile(encoded_filepath)
# We haven't done any processing yet
assert ch_py.is_valid() is None
# Populate ourselves with some keys
with open(encoded_filepath, 'r') as fd_in:
# Decodes content and stops when complete
assert isinstance(ch_py.decode(fd_in), NNTPHeader)
# keys should be good!
assert ch_py.is_valid() is True
for k in ( 'DMCA', 'Removed', 'Cancelled', 'Blocked' ):
# Intentially create a bad key:
ch_py['X-%s' % k] = 'True'
# We should fail now
assert ch_py.is_valid() is False
# it will become valid again once we clear the key
del ch_py['X-%s' % k]
assert ch_py.is_valid() is True