Python pathlib 模块,PurePath() 实例源码
我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用pathlib.PurePath()。
def sane_members(members, destination):
resolve = lambda path: realpath(normpath(join(destination, path)))
destination = PurePath(destination)
for member in members:
mpath = PurePath(resolve(member.path))
# Check if mpath is under destination
if destination not in mpath.parents:
raise BadPathError("Bad path to outside destination directory: {}".format(mpath))
elif member.issym() or member.islnk():
# Check link to make sure it resolves under destination
lnkpath = PurePath(member.linkpath)
if lnkpath.is_absolute() or lnkpath.is_reserved():
raise BadLinkError("Bad link: {}".format(lnkpath))
# resolve the link to an absolute path
lnkpath = PurePath(resolve(lnkpath))
if destination not in lnkpath.parents:
raise BadLinkError("Bad link to outside destination directory: {}".format(lnkpath))
yield member
def load(self, root_folder):
# config files under ROOT/resources/config
# iterate through those files
for subdir, dirs, files in os.walk(root_folder):
for file in files:
# load this yaml file
filepath = os.path.join(subdir, file)
data = yaml.load(open(filepath, 'r'))
path = PurePath(filepath).parts
path = '/'.join(path[path.index('resources'):])
config = get_config_item(data, path)
if config is None:
logger.error('Failed to load {0}'.format(path))
continue
# store this item
self.data[config.data.type] = config.data
def flat(parts):
if isinstance(parts, str):
if os.path.isabs(parts):
raise ValueError('Path must be relative. '
'[{}]'.format(parts))
yield parts
elif isinstance(parts, PurePath):
if parts.is_absolute():
raise ValueError('Path must be relative. '
'[{}]'.format(parts))
yield parts
elif isinstance(parts, (list, tuple)):
for p in parts:
yield from flat(p)
else:
raise TypeError(
'Key must be relative path [str or Path]. '
'But {}'.format(parts))
def test_key(loop):
with tempfile.TemporaryDirectory() as d:
config = MergeDict(
name='',
path=d,
executor=None,
)
context = Context({}, loop=loop)
storage = FileSystemStorage(config, context=context, loop=loop)
await storage.init()
assert str(storage.raw_key(('1', '3', ('4',)))).endswith('1/3/4')
assert str(storage.raw_key((PurePath('1'), '3', ('4',)))).endswith('1/3/4')
with pytest.raises(TypeError):
storage.raw_key(1)
with pytest.raises(ValueError):
storage.raw_key('../..')
with pytest.raises(ValueError):
storage.raw_key('/abs/path')
with pytest.raises(ValueError):
storage.raw_key(PurePath('/abs/path'))
def set_model(self, dataset_id, model_path):
"""Saves on database the model path file of a dataset_id
:param int dataset_id: The id of the dataset
:param str model_path: The path where binary file is located
"""
# Substract to the model_path the relative bin_path
rel_model_path = PurePath(model_path).relative_to(self.bin_path)
query = "UPDATE dataset SET binary_model=? WHERE id=? ;"
res = self.execute_insertion(query, str(rel_model_path), dataset_id)
if res.rowcount == 1:
res.close()
return True, None
else:
res.close()
return False, (400, "Failed when trying to save dataset on db")
def set_search_index(self, dataset_id, index_path):
"""Saves on database the index of a dataset
:param int dataset_id: The id of the dataset
:param string index_path: The path on the filesystem of the index
:returns: If operation was successful
:rtype: tuple
"""
# Substract to the index_path the relative bin_path
rel_index_path = PurePath(index_path).relative_to(self.bin_path)
query = "UPDATE dataset SET binary_index=? WHERE id=? ;"
res = self.execute_insertion(query, str(rel_index_path), dataset_id)
if res.rowcount == 1:
res.close()
return True, None
else:
res.close()
return False, (400, "Failed when trying to save index on db")
def file_new(self, button):
if (not self.currentfile) and (not self.filechanged):
self.textbuffer.set_text("")
else:
self.notsave.set_transient_for(self.window)
self.response = self.notsave.run()
if self.response == Gtk.ResponseType.YES:
self.save(self.filesave)
self.textbuffer.set_text("")
self.notsave.hide()
else:
self.textbuffer.set_text("")
self.currentfile = ""
self.filechanged = False
self.notsave.hide()
self.path = pathlib.PurePath(self.currentfile)
if self.parts:
self.window.set_title(self.path.parts[-1])
else:
self.window.set_title("Untitled document")
def save_as(self, button):
self.filesave.set_transient_for(self.window)
self.response = self.filesave.run()
if self.response == Gtk.ResponseType.OK:
self.file_to_save = self.filesave.get_filename()
if self.file_to_save:
self.currentfile = self.file_to_save
self.path = pathlib.PurePath(self.currentfile)
if self.path.parts:
self.window.set_title(self.path.parts[-1])
else:
self.window.set_title("Untitled document")
with codecs.open(self.file_to_save,
'w',
encoding="utf-8") as f:
f.write(self.textbuffer.get_text(
self.textbuffer.get_start_iter(),
self.textbuffer.get_end_iter(),
False
))
self.filechanged = False
self.filesave.hide()
def __init__(self, pdb, path=True, pdb_id='', ignore_end=False):
self.proc_functions = {
'ATOM': self.proc_atom,
'HETATM': self.proc_atom,
'ENDMDL': self.change_state,
'END': self.end
}
if path:
pdb_path = pathlib.PurePath(pdb)
with open(str(pdb_path), 'r') as inf:
pdb_str = inf.read()
self.id = pdb_path.stem
else:
pdb_str = pdb
self.id = pdb_id
self.pdb_lines = pdb_str.splitlines()
self.new_labels = False
self.state = 0
self.pdb_parse_tree = None
self.current_line = None
self.ignore_end = ignore_end
self.parse_pdb_file()
def _remove_slash_prefix(line):
"""For details on the glob matcher,
see: https://docs.python.org/3/library/pathlib.html#pathlib.PurePath.match
"""
return line[1:] if line.startswith('/') else line
def _matches_patterns(path, patterns):
"""Given a list of patterns, returns a if a path matches any pattern."""
for glob in patterns:
try:
if PurePath(path).match(glob):
return True
except TypeError:
pass
return False
def handle_import(self, name, compilation, rule):
"""
Re-implementation of the core Sass import mechanism, which looks for
files using the staticfiles storage and staticfiles finders.
"""
original_path = PurePath(name)
search_exts = list(compilation.compiler.dynamic_extensions)
if original_path.suffix and original_path.suffix in search_exts:
basename = original_path.stem
else:
basename = original_path.name
if original_path.is_absolute():
# Remove the beginning slash
search_path = original_path.relative_to('/').parent
elif rule.source_file.origin:
search_path = rule.source_file.origin
if original_path.parent:
search_path = os.path.normpath(str(search_path / original_path.parent))
else:
search_path = original_path.parent
for prefix, suffix in product(('_', ''), search_exts):
filename = PurePath(prefix + basename + suffix)
full_filename, storage = get_file_and_storage(str(search_path / filename))
if full_filename:
with storage.open(full_filename) as f:
return SourceFile.from_file(f, origin=search_path, relpath=filename)
def compile(self, *paths):
compilation = self.make_compilation()
for path in paths:
path = PurePath(path)
if path.is_absolute():
path = path.relative_to('/')
filename, storage = get_file_and_storage(str(path))
with storage.open(filename) as f:
source = SourceFile.from_file(f, origin=path.parent, relpath=PurePath(path.name))
compilation.add_source(source)
return self.call_and_catch_errors(compilation.run)
def compile_string(self, string, filename=None):
compilation = self.make_compilation()
if filename is not None:
f = StringIO(string)
filename = PurePath(filename)
source = SourceFile.from_file(f, origin=filename.parent, relpath=PurePath(filename.name))
else:
source = SourceFile.from_string(string)
compilation.add_source(source)
return self.call_and_catch_errors(compilation.run)
def __path_to_object(self, path):
"""
Map from paths to objects.
"""
purepath = pathlib.PurePath(path)
node = self.dashboard
for part in purepath.parts[1:]:
node = node.get_child_by_name(part)
if not node:
break
return node
def raw_key(self, *key):
rel = os.path.normpath(str(PurePath(*flat(key))))
path = self._path.joinpath(self.path_transform(rel)).normpath
if path.relative_to(self._path) == '.':
raise ValueError('Access denied: %s' % path)
return path
def test_can_copy_file(self):
class DummyGuest(Guest):
name = 'dummy'
guest = DummyGuest(FakeContainer())
guest.lxd_container.files.put.return_value = True
with tempfile.NamedTemporaryFile() as f:
f.write(b'dummy file')
f.flush()
guest.copy_file(pathlib.Path(f.name), pathlib.PurePath('/a/b/c'))
assert guest.lxd_container.execute.call_count == 1
assert guest.lxd_container.execute.call_args[0] == (['mkdir', '-p', '/a/b'], )
assert guest.lxd_container.files.put.call_count == 1
assert guest.lxd_container.files.put.call_args[0] == ('/a/b/c', b'dummy file')
def test_generate_random_dir_path(subdir_count: int):
dir_path = generate_random_dir_path(subdir_count)
subdirs = PurePath(dir_path).parts[1:]
assert len(subdirs) == subdir_count
# (Not testing the randomness of underlying UUIDs,
# since that is the implementation detail we do not want
# to rely upon.)
def set_content_image(self, list_image, des_dir):
self.content_text_browser.clear()
self.content_text_browser.setEnabled(True)
for i in list_image:
full_path = html.escape(des_dir + '/' + PurePath(i).name)
self.content_text_browser.append(
"<img src='{}' title='store at : {}'/><br/>".format(full_path, full_path))
self.__load_finished()
QMessageBox.information(self, 'Download Completed',
'All of your donwload images store at : {}'.format(des_dir))
def _check_str_subclass(self, *args):
# Issue #21127: it should be possible to construct a PurePath object
# from an str subclass instance, and it then gets converted to
# a pure str object.
class StrSubclass(str):
pass
P = self.cls
p = P(*(StrSubclass(x) for x in args))
self.assertEqual(p, P(*args))
for part in p.parts:
self.assertIs(type(part), str)
def setUp(self):
template_path = '{}/{}'.format(
pl.PurePath(
os.path.abspath(__file__)).parent / 'examples/yaml', self.template_name)
with open(template_path, 'r') as f:
self.template_dict = yaml.load(f)
def check_news(event, gh, *args, **kwargs):
pr_number = event.data['number']
pull_request = event.data['pull_request']
# For some unknown reason there isn't any files URL in a pull request
# payload.
files_url = f'/repos/python/cpython/pulls/{pr_number}/files'
# Could have collected all the filenames in an async set comprehension,
# but doing it this way potentially minimizes the number of API calls.
in_next_dir = file_found = False
async for file_data in gh.getiter(files_url):
filename = file_data['filename']
if not filename.startswith(NEWS_NEXT_DIR):
continue
in_next_dir = True
file_path = pathlib.PurePath(filename)
if len(file_path.parts) != 5: # Misc, NEWS.d, next, <subsection>, <entry>
continue
file_found = True
if FILENAME_RE.match(file_path.name):
status = create_status(util.StatusState.SUCCESS,
description='News entry found in Misc/NEWS.d')
break
else:
issue = await util.issue_for_PR(gh, pull_request)
if util.skip("news", issue):
status = SKIP_LABEL_STATUS
else:
if not in_next_dir:
description = f'No news entry in {NEWS_NEXT_DIR} or "skip news" label found'
elif not file_found:
description = "News entry not in an appropriate directory"
else:
description = "News entry file name incorrectly formatted"
status = create_status(util.StatusState.FAILURE,
description=description,
target_url=DEVGUIDE_URL)
await gh.post(pull_request['statuses_url'], data=status)
def __init__(self):
self.builder = Gtk.Builder()
self.builder.add_from_file("simpletexteditor.glade")
self.builder.connect_signals(self)
self.window = self.builder.get_object("window")
self.textview = self.builder.get_object("textview")
self.textbuffer = self.builder.get_object("textbuffer")
self.about = self.builder.get_object("about_page")
self.fileopen = self.builder.get_object("fileopen")
self.filesave = self.builder.get_object("filesave")
self.fontchooser = self.builder.get_object("fontchooser")
# notsave is msgdialog for asking
# if user wants to save current textbuffer
self.notsave = self.builder.get_object("notsave")
self.notsavetwo = self.builder.get_object("notsavetwo")
self.currentfile = ""
self.path = pathlib.PurePath(self.currentfile)
if self.path.parts:
self.window.set_title(self.path.parts[-1])
else:
self.window.set_title("Untitled document")
self.filechanged = False
self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
def file_open(self, button):
if self.filechanged:
self.notsavetwo.set_transient_for(self.window)
self.response = self.notsavetwo.run()
if self.response == Gtk.ResponseType.YES:
self.save(self.filesave)
self.notsavetwo.hide()
self.fileopen.set_transient_for(self.window)
self.response = self.fileopen.run()
if self.response == Gtk.ResponseType.OK:
self.file_to_open = self.fileopen.get_filename()
if self.file_to_open:
self.currentfile = self.file_to_open
self.path = pathlib.PurePath(self.currentfile)
self.window.set_title(self.path.parts[-1])
if self.path.parts:
self.window.set_title(self.path.parts[-1])
else:
self.window.set_title("Untitled document")
with codecs.open(self.file_to_open,
'r',
encoding="utf-8") as f:
self.content_file = f.read()
self.textbuffer.set_text(self.content_file)
self.fileopen.hide()
def _extract_files(self, disk, files, path):
path = str(PurePath(path, 'extracted_files'))
makedirs(path)
extracted, failed = extract_files(self.filesystems[disk], files, path)
self.logger.info("Files extracted into %s.", path)
if failed:
self.logger.warning(
"The following files could not be extracted: %s",
'\n'.join(failed.values()))
return extracted, failed
def _check_str_subclass(self, *args):
# Issue #21127: it should be possible to construct a PurePath object
# from an str subclass instance, and it then gets converted to
# a pure str object.
class StrSubclass(str):
pass
P = self.cls
p = P(*(StrSubclass(x) for x in args))
self.assertEqual(p, P(*args))
for part in p.parts:
self.assertIs(type(part), str)
def fspath(path):
"""
Return the string representation of the path.
If str or bytes is passed in, it is returned unchanged.
This code comes from PEP 519, modified to support earlier versions of
python.
This is required for python < 3.6.
"""
if isinstance(path, (py.builtin.text, py.builtin.bytes)):
return path
# Work from the object's type to match method resolution of other magic
# methods.
path_type = type(path)
try:
return path_type.__fspath__(path)
except AttributeError:
if hasattr(path_type, '__fspath__'):
raise
try:
import pathlib
except ImportError:
pass
else:
if isinstance(path, pathlib.PurePath):
return py.builtin.text(path)
raise TypeError("expected str, bytes or os.PathLike object, not "
+ path_type.__name__)
def substitute_dir_in_path(path, olddir, newdir):
pp = PurePath(path)
parts = [p if p != olddir else newdir for p in pp.parts]
return os.path.join(*parts)
def globless_prefix(self):
""" Return shortest path prefix without glob quantifiers. """
parts = []
for part in self.parts:
if any(q in part for q in ['*', '?']):
break
parts.append(part)
return pathlib.PurePath(*parts)
def download_link(self, path, node, retry, dcb, ecb):
disk_path_object = pathlib.Path(path.lstrip('/'))
disk_path = str(disk_path_object)
if self.vfs.is_internal_link(node):
link_target_path = str(pathlib.PurePath(node.read_link()))
else:
assert False
if disk_path_object.is_symlink():
existing_link_target_path = os.readlink(disk_path)
if existing_link_target_path == link_target_path:
self.logger.info('???????????????? {}' \
.format(disk_path))
return True
download_ok = False
for i in range(retry):
try:
if i != 0:
self.logger.error('???????? {}?????? {} ?' \
.format(disk_path, i + 1))
try:
dcb(path, False, None, None, None)
disk_path_object.symlink_to(link_target_path)
dcb(path, True, None, None, None)
ecb(path)
download_ok = True
break
except FileExistsError:
if disk_path_object.is_symlink():
disk_path_object.unlink()
dcb(path, False, None, None, None)
disk_path_object.symlink_to(link_target_path)
dcb(path, True, None, None, None)
ecb(path)
download_ok = True
break
except IOError as err:
ecb(path)
self.logger.error(err)
if not download_ok:
return False
return True
def import_data(cls, type, view, view_type=None):
type_, type = type, __builtins__['type']
is_format = False
if isinstance(type_, str):
type_ = qiime2.sdk.parse_type(type_)
if isinstance(view_type, str):
view_type = qiime2.sdk.parse_format(view_type)
is_format = True
if view_type is None:
if type(view) is str or isinstance(view, pathlib.PurePath):
is_format = True
pm = qiime2.sdk.PluginManager()
output_dir_fmt = pm.get_directory_format(type_)
if pathlib.Path(view).is_file():
if not issubclass(output_dir_fmt,
model.SingleFileDirectoryFormatBase):
raise qiime2.plugin.ValidationError(
"Importing %r requires a directory, not %s"
% (output_dir_fmt.__name__, view))
view_type = output_dir_fmt.file.format
else:
view_type = output_dir_fmt
else:
view_type = type(view)
format_ = None
md5sums = None
if is_format:
path = pathlib.Path(view)
if path.is_file():
md5sums = {path.name: util.md5sum(path)}
elif path.is_dir():
md5sums = util.md5sum_directory(path)
else:
raise qiime2.plugin.ValidationError(
"Path '%s' does not exist." % path)
format_ = view_type
provenance_capture = archive.ImportProvenanceCapture(format_, md5sums)
return cls._from_view(type_, view, view_type, provenance_capture)
def _extract_from_wiki(self):
title = self.title_line_edit.text()
if title:
page = self.page_combo_box.currentText()
wikipedia.set_lang(self.lang_combo_box.currentText())
self.load_progressbar.setMinimum(0)
self.load_progressbar.setMaximum(0)
class ProgressThread(QThread, QWidget):
content_link_arrived = pyqtSignal([list])
content_text_arrived = pyqtSignal(['QString'])
content_image_arrived = pyqtSignal([list, 'QString'])
error_occurred = pyqtSignal()
valid_images = []
def run(self):
try:
wiki = wikipedia.page(title=title)
f = open('templates/template.html')
if page == 'Content':
self.content_text_arrived.emit(wiki.content)
elif page == 'Images':
print(wiki.images)
self.des_dir = Preferences.output_path + '/' + title
self.valid_images = []
if not os.path.exists(self.des_dir):
print(self.des_dir)
os.mkdir(self.des_dir)
for i in wiki.images:
if PurePath(i).suffix in Preferences.valid_image_formats:
print(i)
print(self.des_dir)
wget.download(i, out=self.des_dir)
self.valid_images.append(i)
self.content_image_arrived.emit(self.valid_images, self.des_dir)
elif page == 'Summary':
self.content_text_arrived.emit(wiki.summary)
elif page == 'Images Links':
self.content_link_arrived.emit(wiki.images)
elif page == 'References Links':
self.content_link_arrived.emit(wiki.references)
except:
self.error_occurred.emit()
self.progress_thread = ProgressThread()
self.progress_thread.content_link_arrived.connect(self.set_content_link)
self.progress_thread.content_text_arrived.connect(self.set_content_text)
self.progress_thread.content_image_arrived.connect(self.set_content_image)
self.progress_thread.error_occurred.connect(self.handle_error_occurred)
self.progress_thread.start()
else:
self.content_text_browser.clear()
self.content_text_browser.setEnabled(False)