我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用pathlib.PurePath()。
def sane_members(members, destination): resolve = lambda path: realpath(normpath(join(destination, path))) destination = PurePath(destination) for member in members: mpath = PurePath(resolve(member.path)) # Check if mpath is under destination if destination not in mpath.parents: raise BadPathError("Bad path to outside destination directory: {}".format(mpath)) elif member.issym() or member.islnk(): # Check link to make sure it resolves under destination lnkpath = PurePath(member.linkpath) if lnkpath.is_absolute() or lnkpath.is_reserved(): raise BadLinkError("Bad link: {}".format(lnkpath)) # resolve the link to an absolute path lnkpath = PurePath(resolve(lnkpath)) if destination not in lnkpath.parents: raise BadLinkError("Bad link to outside destination directory: {}".format(lnkpath)) yield member
def load(self, root_folder): # config files under ROOT/resources/config # iterate through those files for subdir, dirs, files in os.walk(root_folder): for file in files: # load this yaml file filepath = os.path.join(subdir, file) data = yaml.load(open(filepath, 'r')) path = PurePath(filepath).parts path = '/'.join(path[path.index('resources'):]) config = get_config_item(data, path) if config is None: logger.error('Failed to load {0}'.format(path)) continue # store this item self.data[config.data.type] = config.data
def flat(parts): if isinstance(parts, str): if os.path.isabs(parts): raise ValueError('Path must be relative. ' '[{}]'.format(parts)) yield parts elif isinstance(parts, PurePath): if parts.is_absolute(): raise ValueError('Path must be relative. ' '[{}]'.format(parts)) yield parts elif isinstance(parts, (list, tuple)): for p in parts: yield from flat(p) else: raise TypeError( 'Key must be relative path [str or Path]. ' 'But {}'.format(parts))
def test_key(loop): with tempfile.TemporaryDirectory() as d: config = MergeDict( name='', path=d, executor=None, ) context = Context({}, loop=loop) storage = FileSystemStorage(config, context=context, loop=loop) await storage.init() assert str(storage.raw_key(('1', '3', ('4',)))).endswith('1/3/4') assert str(storage.raw_key((PurePath('1'), '3', ('4',)))).endswith('1/3/4') with pytest.raises(TypeError): storage.raw_key(1) with pytest.raises(ValueError): storage.raw_key('../..') with pytest.raises(ValueError): storage.raw_key('/abs/path') with pytest.raises(ValueError): storage.raw_key(PurePath('/abs/path'))
def set_model(self, dataset_id, model_path): """Saves on database the model path file of a dataset_id :param int dataset_id: The id of the dataset :param str model_path: The path where binary file is located """ # Substract to the model_path the relative bin_path rel_model_path = PurePath(model_path).relative_to(self.bin_path) query = "UPDATE dataset SET binary_model=? WHERE id=? ;" res = self.execute_insertion(query, str(rel_model_path), dataset_id) if res.rowcount == 1: res.close() return True, None else: res.close() return False, (400, "Failed when trying to save dataset on db")
def set_search_index(self, dataset_id, index_path): """Saves on database the index of a dataset :param int dataset_id: The id of the dataset :param string index_path: The path on the filesystem of the index :returns: If operation was successful :rtype: tuple """ # Substract to the index_path the relative bin_path rel_index_path = PurePath(index_path).relative_to(self.bin_path) query = "UPDATE dataset SET binary_index=? WHERE id=? ;" res = self.execute_insertion(query, str(rel_index_path), dataset_id) if res.rowcount == 1: res.close() return True, None else: res.close() return False, (400, "Failed when trying to save index on db")
def file_new(self, button): if (not self.currentfile) and (not self.filechanged): self.textbuffer.set_text("") else: self.notsave.set_transient_for(self.window) self.response = self.notsave.run() if self.response == Gtk.ResponseType.YES: self.save(self.filesave) self.textbuffer.set_text("") self.notsave.hide() else: self.textbuffer.set_text("") self.currentfile = "" self.filechanged = False self.notsave.hide() self.path = pathlib.PurePath(self.currentfile) if self.parts: self.window.set_title(self.path.parts[-1]) else: self.window.set_title("Untitled document")
def save_as(self, button): self.filesave.set_transient_for(self.window) self.response = self.filesave.run() if self.response == Gtk.ResponseType.OK: self.file_to_save = self.filesave.get_filename() if self.file_to_save: self.currentfile = self.file_to_save self.path = pathlib.PurePath(self.currentfile) if self.path.parts: self.window.set_title(self.path.parts[-1]) else: self.window.set_title("Untitled document") with codecs.open(self.file_to_save, 'w', encoding="utf-8") as f: f.write(self.textbuffer.get_text( self.textbuffer.get_start_iter(), self.textbuffer.get_end_iter(), False )) self.filechanged = False self.filesave.hide()
def __init__(self, pdb, path=True, pdb_id='', ignore_end=False): self.proc_functions = { 'ATOM': self.proc_atom, 'HETATM': self.proc_atom, 'ENDMDL': self.change_state, 'END': self.end } if path: pdb_path = pathlib.PurePath(pdb) with open(str(pdb_path), 'r') as inf: pdb_str = inf.read() self.id = pdb_path.stem else: pdb_str = pdb self.id = pdb_id self.pdb_lines = pdb_str.splitlines() self.new_labels = False self.state = 0 self.pdb_parse_tree = None self.current_line = None self.ignore_end = ignore_end self.parse_pdb_file()
def _remove_slash_prefix(line): """For details on the glob matcher, see: https://docs.python.org/3/library/pathlib.html#pathlib.PurePath.match """ return line[1:] if line.startswith('/') else line
def _matches_patterns(path, patterns): """Given a list of patterns, returns a if a path matches any pattern.""" for glob in patterns: try: if PurePath(path).match(glob): return True except TypeError: pass return False
def parts(my_path): return PurePath(my_path).parts
def handle_import(self, name, compilation, rule): """ Re-implementation of the core Sass import mechanism, which looks for files using the staticfiles storage and staticfiles finders. """ original_path = PurePath(name) search_exts = list(compilation.compiler.dynamic_extensions) if original_path.suffix and original_path.suffix in search_exts: basename = original_path.stem else: basename = original_path.name if original_path.is_absolute(): # Remove the beginning slash search_path = original_path.relative_to('/').parent elif rule.source_file.origin: search_path = rule.source_file.origin if original_path.parent: search_path = os.path.normpath(str(search_path / original_path.parent)) else: search_path = original_path.parent for prefix, suffix in product(('_', ''), search_exts): filename = PurePath(prefix + basename + suffix) full_filename, storage = get_file_and_storage(str(search_path / filename)) if full_filename: with storage.open(full_filename) as f: return SourceFile.from_file(f, origin=search_path, relpath=filename)
def compile(self, *paths): compilation = self.make_compilation() for path in paths: path = PurePath(path) if path.is_absolute(): path = path.relative_to('/') filename, storage = get_file_and_storage(str(path)) with storage.open(filename) as f: source = SourceFile.from_file(f, origin=path.parent, relpath=PurePath(path.name)) compilation.add_source(source) return self.call_and_catch_errors(compilation.run)
def compile_string(self, string, filename=None): compilation = self.make_compilation() if filename is not None: f = StringIO(string) filename = PurePath(filename) source = SourceFile.from_file(f, origin=filename.parent, relpath=PurePath(filename.name)) else: source = SourceFile.from_string(string) compilation.add_source(source) return self.call_and_catch_errors(compilation.run)
def __path_to_object(self, path): """ Map from paths to objects. """ purepath = pathlib.PurePath(path) node = self.dashboard for part in purepath.parts[1:]: node = node.get_child_by_name(part) if not node: break return node
def raw_key(self, *key): rel = os.path.normpath(str(PurePath(*flat(key)))) path = self._path.joinpath(self.path_transform(rel)).normpath if path.relative_to(self._path) == '.': raise ValueError('Access denied: %s' % path) return path
def test_can_copy_file(self): class DummyGuest(Guest): name = 'dummy' guest = DummyGuest(FakeContainer()) guest.lxd_container.files.put.return_value = True with tempfile.NamedTemporaryFile() as f: f.write(b'dummy file') f.flush() guest.copy_file(pathlib.Path(f.name), pathlib.PurePath('/a/b/c')) assert guest.lxd_container.execute.call_count == 1 assert guest.lxd_container.execute.call_args[0] == (['mkdir', '-p', '/a/b'], ) assert guest.lxd_container.files.put.call_count == 1 assert guest.lxd_container.files.put.call_args[0] == ('/a/b/c', b'dummy file')
def test_generate_random_dir_path(subdir_count: int): dir_path = generate_random_dir_path(subdir_count) subdirs = PurePath(dir_path).parts[1:] assert len(subdirs) == subdir_count # (Not testing the randomness of underlying UUIDs, # since that is the implementation detail we do not want # to rely upon.)
def set_content_image(self, list_image, des_dir): self.content_text_browser.clear() self.content_text_browser.setEnabled(True) for i in list_image: full_path = html.escape(des_dir + '/' + PurePath(i).name) self.content_text_browser.append( "<img src='{}' title='store at : {}'/><br/>".format(full_path, full_path)) self.__load_finished() QMessageBox.information(self, 'Download Completed', 'All of your donwload images store at : {}'.format(des_dir))
def _check_str_subclass(self, *args): # Issue #21127: it should be possible to construct a PurePath object # from an str subclass instance, and it then gets converted to # a pure str object. class StrSubclass(str): pass P = self.cls p = P(*(StrSubclass(x) for x in args)) self.assertEqual(p, P(*args)) for part in p.parts: self.assertIs(type(part), str)
def setUp(self): template_path = '{}/{}'.format( pl.PurePath( os.path.abspath(__file__)).parent / 'examples/yaml', self.template_name) with open(template_path, 'r') as f: self.template_dict = yaml.load(f)
def check_news(event, gh, *args, **kwargs): pr_number = event.data['number'] pull_request = event.data['pull_request'] # For some unknown reason there isn't any files URL in a pull request # payload. files_url = f'/repos/python/cpython/pulls/{pr_number}/files' # Could have collected all the filenames in an async set comprehension, # but doing it this way potentially minimizes the number of API calls. in_next_dir = file_found = False async for file_data in gh.getiter(files_url): filename = file_data['filename'] if not filename.startswith(NEWS_NEXT_DIR): continue in_next_dir = True file_path = pathlib.PurePath(filename) if len(file_path.parts) != 5: # Misc, NEWS.d, next, <subsection>, <entry> continue file_found = True if FILENAME_RE.match(file_path.name): status = create_status(util.StatusState.SUCCESS, description='News entry found in Misc/NEWS.d') break else: issue = await util.issue_for_PR(gh, pull_request) if util.skip("news", issue): status = SKIP_LABEL_STATUS else: if not in_next_dir: description = f'No news entry in {NEWS_NEXT_DIR} or "skip news" label found' elif not file_found: description = "News entry not in an appropriate directory" else: description = "News entry file name incorrectly formatted" status = create_status(util.StatusState.FAILURE, description=description, target_url=DEVGUIDE_URL) await gh.post(pull_request['statuses_url'], data=status)
def __init__(self): self.builder = Gtk.Builder() self.builder.add_from_file("simpletexteditor.glade") self.builder.connect_signals(self) self.window = self.builder.get_object("window") self.textview = self.builder.get_object("textview") self.textbuffer = self.builder.get_object("textbuffer") self.about = self.builder.get_object("about_page") self.fileopen = self.builder.get_object("fileopen") self.filesave = self.builder.get_object("filesave") self.fontchooser = self.builder.get_object("fontchooser") # notsave is msgdialog for asking # if user wants to save current textbuffer self.notsave = self.builder.get_object("notsave") self.notsavetwo = self.builder.get_object("notsavetwo") self.currentfile = "" self.path = pathlib.PurePath(self.currentfile) if self.path.parts: self.window.set_title(self.path.parts[-1]) else: self.window.set_title("Untitled document") self.filechanged = False self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
def file_open(self, button): if self.filechanged: self.notsavetwo.set_transient_for(self.window) self.response = self.notsavetwo.run() if self.response == Gtk.ResponseType.YES: self.save(self.filesave) self.notsavetwo.hide() self.fileopen.set_transient_for(self.window) self.response = self.fileopen.run() if self.response == Gtk.ResponseType.OK: self.file_to_open = self.fileopen.get_filename() if self.file_to_open: self.currentfile = self.file_to_open self.path = pathlib.PurePath(self.currentfile) self.window.set_title(self.path.parts[-1]) if self.path.parts: self.window.set_title(self.path.parts[-1]) else: self.window.set_title("Untitled document") with codecs.open(self.file_to_open, 'r', encoding="utf-8") as f: self.content_file = f.read() self.textbuffer.set_text(self.content_file) self.fileopen.hide()
def _extract_files(self, disk, files, path): path = str(PurePath(path, 'extracted_files')) makedirs(path) extracted, failed = extract_files(self.filesystems[disk], files, path) self.logger.info("Files extracted into %s.", path) if failed: self.logger.warning( "The following files could not be extracted: %s", '\n'.join(failed.values())) return extracted, failed
def fspath(path): """ Return the string representation of the path. If str or bytes is passed in, it is returned unchanged. This code comes from PEP 519, modified to support earlier versions of python. This is required for python < 3.6. """ if isinstance(path, (py.builtin.text, py.builtin.bytes)): return path # Work from the object's type to match method resolution of other magic # methods. path_type = type(path) try: return path_type.__fspath__(path) except AttributeError: if hasattr(path_type, '__fspath__'): raise try: import pathlib except ImportError: pass else: if isinstance(path, pathlib.PurePath): return py.builtin.text(path) raise TypeError("expected str, bytes or os.PathLike object, not " + path_type.__name__)
def substitute_dir_in_path(path, olddir, newdir): pp = PurePath(path) parts = [p if p != olddir else newdir for p in pp.parts] return os.path.join(*parts)
def globless_prefix(self): """ Return shortest path prefix without glob quantifiers. """ parts = [] for part in self.parts: if any(q in part for q in ['*', '?']): break parts.append(part) return pathlib.PurePath(*parts)
def download_link(self, path, node, retry, dcb, ecb): disk_path_object = pathlib.Path(path.lstrip('/')) disk_path = str(disk_path_object) if self.vfs.is_internal_link(node): link_target_path = str(pathlib.PurePath(node.read_link())) else: assert False if disk_path_object.is_symlink(): existing_link_target_path = os.readlink(disk_path) if existing_link_target_path == link_target_path: self.logger.info('???????????????? {}' \ .format(disk_path)) return True download_ok = False for i in range(retry): try: if i != 0: self.logger.error('???????? {}?????? {} ?' \ .format(disk_path, i + 1)) try: dcb(path, False, None, None, None) disk_path_object.symlink_to(link_target_path) dcb(path, True, None, None, None) ecb(path) download_ok = True break except FileExistsError: if disk_path_object.is_symlink(): disk_path_object.unlink() dcb(path, False, None, None, None) disk_path_object.symlink_to(link_target_path) dcb(path, True, None, None, None) ecb(path) download_ok = True break except IOError as err: ecb(path) self.logger.error(err) if not download_ok: return False return True
def import_data(cls, type, view, view_type=None): type_, type = type, __builtins__['type'] is_format = False if isinstance(type_, str): type_ = qiime2.sdk.parse_type(type_) if isinstance(view_type, str): view_type = qiime2.sdk.parse_format(view_type) is_format = True if view_type is None: if type(view) is str or isinstance(view, pathlib.PurePath): is_format = True pm = qiime2.sdk.PluginManager() output_dir_fmt = pm.get_directory_format(type_) if pathlib.Path(view).is_file(): if not issubclass(output_dir_fmt, model.SingleFileDirectoryFormatBase): raise qiime2.plugin.ValidationError( "Importing %r requires a directory, not %s" % (output_dir_fmt.__name__, view)) view_type = output_dir_fmt.file.format else: view_type = output_dir_fmt else: view_type = type(view) format_ = None md5sums = None if is_format: path = pathlib.Path(view) if path.is_file(): md5sums = {path.name: util.md5sum(path)} elif path.is_dir(): md5sums = util.md5sum_directory(path) else: raise qiime2.plugin.ValidationError( "Path '%s' does not exist." % path) format_ = view_type provenance_capture = archive.ImportProvenanceCapture(format_, md5sums) return cls._from_view(type_, view, view_type, provenance_capture)
def _extract_from_wiki(self): title = self.title_line_edit.text() if title: page = self.page_combo_box.currentText() wikipedia.set_lang(self.lang_combo_box.currentText()) self.load_progressbar.setMinimum(0) self.load_progressbar.setMaximum(0) class ProgressThread(QThread, QWidget): content_link_arrived = pyqtSignal([list]) content_text_arrived = pyqtSignal(['QString']) content_image_arrived = pyqtSignal([list, 'QString']) error_occurred = pyqtSignal() valid_images = [] def run(self): try: wiki = wikipedia.page(title=title) f = open('templates/template.html') if page == 'Content': self.content_text_arrived.emit(wiki.content) elif page == 'Images': print(wiki.images) self.des_dir = Preferences.output_path + '/' + title self.valid_images = [] if not os.path.exists(self.des_dir): print(self.des_dir) os.mkdir(self.des_dir) for i in wiki.images: if PurePath(i).suffix in Preferences.valid_image_formats: print(i) print(self.des_dir) wget.download(i, out=self.des_dir) self.valid_images.append(i) self.content_image_arrived.emit(self.valid_images, self.des_dir) elif page == 'Summary': self.content_text_arrived.emit(wiki.summary) elif page == 'Images Links': self.content_link_arrived.emit(wiki.images) elif page == 'References Links': self.content_link_arrived.emit(wiki.references) except: self.error_occurred.emit() self.progress_thread = ProgressThread() self.progress_thread.content_link_arrived.connect(self.set_content_link) self.progress_thread.content_text_arrived.connect(self.set_content_text) self.progress_thread.content_image_arrived.connect(self.set_content_image) self.progress_thread.error_occurred.connect(self.handle_error_occurred) self.progress_thread.start() else: self.content_text_browser.clear() self.content_text_browser.setEnabled(False)