我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.F_OK。
def _clean_check(cmd, target): """ Run the command to download target. If the command fails, clean up before re-raising the error. """ try: subprocess.check_call(cmd) except subprocess.CalledProcessError: if os.access(target, os.F_OK): os.unlink(target) raise
def __init__(self, dir, file_template=_default_file_template, truncate_slug_length=40, version_locations=None, sourceless=False, output_encoding="utf-8"): self.dir = dir self.file_template = file_template self.version_locations = version_locations self.truncate_slug_length = truncate_slug_length or 40 self.sourceless = sourceless self.output_encoding = output_encoding self.revision_map = revision.RevisionMap(self._load_revisions) if not os.access(dir, os.F_OK): raise util.CommandError("Path doesn't exist: %r. Please use " "the 'init' command to create a new " "scripts folder." % dir)
def write_script( scriptdir, rev_id, content, encoding='ascii', sourceless=False): old = scriptdir.revision_map.get_revision(rev_id) path = old.path content = textwrap.dedent(content) if encoding: content = content.encode(encoding) with open(path, 'wb') as fp: fp.write(content) pyc_path = util.pyc_file_from_path(path) if os.access(pyc_path, os.F_OK): os.unlink(pyc_path) script = Script._from_path(scriptdir, path) old = scriptdir.revision_map.get_revision(script.revision) if old.down_revision != script.down_revision: raise Exception("Can't change down_revision " "on a refresh operation.") scriptdir.revision_map.add_revision(script, _replace=True) if sourceless: make_sourceless(path)
def getTasksProcForce(self) : """ Get tasks list by bruteforcing /proc """ for i in range(1, 65535, 1) : if(os.access("/proc/" + str(i) + "/status", os.F_OK) == True): l = self.openProcTaskStatus("/proc/" + str(i) + "/status") if(l != []): self.tasks_procforce.map_tasks[int(l[0])] = [int(l[1]), int(l[2]), l[3], 0, 0] self.tasks_procforce.list_tasks.append(int(l[0])) if(os.access("/proc/" + str(i) + "/task", os.F_OK) == True): for srep in os.listdir("/proc/" + str(i) + "/task"): if(srep != l[0]): ll = self.openProcTaskStatus("/proc/" + str(i) + "/task/" + srep + "/status") self.tasks_procforce.map_tasks[int(ll[0])] = [int(ll[1]), int(ll[2]), ll[3], 0, 1] self.tasks_procforce.list_tasks.append(int(ll[0]))
def _clone_test_db(self, number, verbosity, keepdb=False): source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] # Forking automatically makes a copy of an in-memory database. if not self.connection.is_in_memory_db(source_database_name): # Erase the old test database if os.access(target_database_name, os.F_OK): if keepdb: return if verbosity >= 1: print("Destroying old test database for alias %s..." % ( self._get_database_display_str(verbosity, target_database_name), )) try: os.remove(target_database_name) except Exception as e: sys.stderr.write("Got an error deleting the old test database: %s\n" % e) sys.exit(2) try: shutil.copy(source_database_name, target_database_name) except Exception as e: sys.stderr.write("Got an error cloning the test database: %s\n" % e) sys.exit(2)
def _popen(command, args): import os path = os.environ.get("PATH", os.defpath).split(os.pathsep) path.extend(('/sbin', '/usr/sbin')) for dir in path: executable = os.path.join(dir, command) if (os.path.exists(executable) and os.access(executable, os.F_OK | os.X_OK) and not os.path.isdir(executable)): break else: return None # LC_ALL to ensure English output, 2>/dev/null to prevent output on # stderr (Note: we don't have an example where the words we search for # are actually localized, but in theory some system could do so.) cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args) return os.popen(cmd)
def mountpoint_choosen(option): if option is None: return from Screens.ChoiceBox import ChoiceBox print "scanning", option (description, mountpoint, session) = option res = scanDevice(mountpoint) list = [ (r.description, r, res[r], session) for r in res ] if not list: from Screens.MessageBox import MessageBox if os.access(mountpoint, os.F_OK|os.R_OK): session.open(MessageBox, _("No displayable files on this medium found!"), MessageBox.TYPE_INFO, simple = True, timeout = 5) else: print "ignore", mountpoint, "because its not accessible" return session.openWithCallback(execute, ChoiceBox, title = _("The following files were found..."), list = list)
def on_debug_activate(self): dotfile = "/home/pi/dev/bossjones-github/scarlett_os/_debug/generator-player.dot" pngfile = "/home/pi/dev/bossjones-github/scarlett_os/_debug/generator-player-pipeline.png" # NOQA parent_dir = get_parent_dir(dotfile) mkdir_if_does_not_exist(parent_dir) if os.access(dotfile, os.F_OK): os.remove(dotfile) if os.access(pngfile, os.F_OK): os.remove(pngfile) if not fname_exists(dotfile): touch_empty_file(dotfile) if not fname_exists(pngfile): touch_empty_file(pngfile) Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "generator-player") os.system('/usr/bin/dot' + " -Tpng -o " + pngfile + " " + dotfile) # Gstreamer callbacks.
def set_imageinfo( memoryFilePath ): path = r'{}'.format( memoryFilePath ) path = os.path.abspath( path ) try: if os.access( path, os.F_OK): if os.access( path, os.R_OK ): cwd = os.getcwd() imageinfo = [ 'vol.py', '-f', '{}'.format( path ), 'imageinfo', \ '--output=text', \ '--output-file={}'.format( os.path.join( cwd, 'imageinfo.text' ))] return imageinfo else: print '\n[!] Error File Permissions: No Read Access for {}\n'.format( path ) else: print '\n[!] Error FilePath: Does not exist {}\n'.format( path ) except Exception as set_imageinfo_error: print '[!] EXCEPTION ERROR: < set_imageinfo > function' print set_imageinfo_error
def _set_cwd(self): try: cwd = os.getcwd() if not os.access(cwd, os.F_OK|os.R_OK): raise return cwd except: # we don't have access to the cwd, probably because of sudo. # Try and move to a neutral location to prevent errors for cwd in [os.path.expandvars('$HOME'), tempfile.gettempdir()]: try: if os.access(cwd, os.F_OK|os.R_OK): os.chdir(cwd) return cwd except: pass # we won't error here, as it may *not* be a problem, # and we don't want to break modules unnecessarily return None
def wrap_code(self, routine, helpers=[]): workdir = self.filepath or tempfile.mkdtemp("_sympy_compile") if not os.access(workdir, os.F_OK): os.mkdir(workdir) oldwork = os.getcwd() os.chdir(workdir) try: sys.path.append(workdir) self._generate_code(routine, helpers) self._prepare_files(routine) self._process_files(routine) mod = __import__(self.module_name) finally: sys.path.remove(workdir) CodeWrapper._module_counter += 1 os.chdir(oldwork) if not self.filepath: shutil.rmtree(workdir) return self._get_wrapped_function(mod)
def crear_repo(package_name): md5hash_pn = md5(package_name).hexdigest() first_pref = md5hash_pn[0:2] second_pref = md5hash_pn[2:4] workingdir = root_git_dir+"/"+first_pref+"/"+second_pref+"/" + package_name if not(os.access(root_git_dir+"/"+first_pref, os.F_OK)): os.mkdir(root_git_dir+"/"+first_pref) os.mkdir(root_git_dir+"/"+first_pref+"/"+second_pref) elif not(os.access(root_git_dir+"/"+first_pref+"/"+second_pref, os.F_OK)): os.mkdir(root_git_dir+"/"+first_pref+"/"+second_pref) repo = pygit2.init_repository(workingdir) dashed_package_name=package_name.replace('.','-').lower() myRemote = repo.remotes.create(package_name, gitlab_url+'/marvin/'+dashed_package_name+'.git') gl = Gitlab (gitlab_url, gitlab_token) gl.auth() p = gl.Project({'name': package_name, 'public':True}) p.save() return repo
def __init__(self, dir, file_template=_default_file_template, truncate_slug_length=40, version_locations=None, sourceless=False, output_encoding="utf-8", timezone=None): self.dir = dir self.file_template = file_template self.version_locations = version_locations self.truncate_slug_length = truncate_slug_length or 40 self.sourceless = sourceless self.output_encoding = output_encoding self.revision_map = revision.RevisionMap(self._load_revisions) self.timezone = timezone if not os.access(dir, os.F_OK): raise util.CommandError("Path doesn't exist: %r. Please use " "the 'init' command to create a new " "scripts folder." % dir)
def setTmpDir(self): """Set the tmpDir attribute to a reasonnable location for a temporary directory""" if os.name != 'nt': # On unix use /tmp by default self.tmpDir = os.environ.get("TMPDIR", "/tmp") self.tmpDir = os.environ.get("TMP", self.tmpDir) else: # On Windows use the current directory self.tmpDir = os.environ.get("TMPDIR", "") self.tmpDir = os.environ.get("TMP", self.tmpDir) self.tmpDir = os.environ.get("TEMP", self.tmpDir) if not os.path.isdir(self.tmpDir): self.tmpDir = "" elif not os.access(self.tmpDir, os.F_OK + os.W_OK): self.tmpDir = ""
def do_cleanup(self): """cleanup temporary files""" super(ModelIterationSample, self).do_cleanup() if self.temp: for i in ( self.transform, self.inv_transform, self.corr_transform, self.corr_mri, self.diff_bias, self.corr_bias, self.transform_grid, self.inv_transform_grid, self.corr_transform_grid0, self.corr_transform_grid1, ): if os.access(i, os.F_OK): os.unlink(i)
def _create_test_db(self, verbosity, autoclobber, keepdb=False): test_database_name = self._get_test_db_name() if keepdb: return test_database_name if not self.connection.is_in_memory_db(test_database_name): # Erase the old test database if verbosity >= 1: print("Destroying old test database '%s'..." % self.connection.alias) if os.access(test_database_name, os.F_OK): if not autoclobber: confirm = input( "Type 'yes' if you would like to try deleting the test " "database '%s', or 'no' to cancel: " % test_database_name ) if autoclobber or confirm == 'yes': try: os.remove(test_database_name) except Exception as e: sys.stderr.write("Got an error deleting the old test database: %s\n" % e) sys.exit(2) else: print("Tests cancelled.") sys.exit(1) return test_database_name
def execute(self): from ranger.container.file import File from os import access new_name = self.rest(1) if not new_name: return self.fm.notify('Syntax: rename <newname>', bad=True) if new_name == self.fm.thisfile.basename: return if access(new_name, os.F_OK): return self.fm.notify("Can't rename: file already exists!", bad=True) self.fm.rename(self.fm.thisfile, new_name) f = File(new_name) self.fm.thisdir.pointed_obj = f self.fm.thisfile = f
def _clone_test_db(self, number, verbosity, keepdb=False): source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] # Forking automatically makes a copy of an in-memory database. if not self.is_in_memory_db(source_database_name): # Erase the old test database if os.access(target_database_name, os.F_OK): if keepdb: return if verbosity >= 1: print("Destroying old test database for alias %s..." % ( self._get_database_display_str(verbosity, target_database_name), )) try: os.remove(target_database_name) except Exception as e: sys.stderr.write("Got an error deleting the old test database: %s\n" % e) sys.exit(2) try: shutil.copy(source_database_name, target_database_name) except Exception as e: sys.stderr.write("Got an error cloning the test database: %s\n" % e) sys.exit(2)
def test_sftp_session(server): for uid in server.users: target_dir = tempfile.mkdtemp() target_fname = os.path.join(target_dir, "foo") assert not os.access(target_fname, os.F_OK) with server.client(uid) as c: sftp = c.open_sftp() sftp.put(__file__, target_fname, confirm=True) assert files_equal(target_fname, __file__) second_copy = os.path.join(target_dir, "bar") assert not os.access(second_copy, os.F_OK) sftp.get(target_fname, second_copy) assert files_equal(target_fname, second_copy) dir_contents = sftp.listdir(target_dir) assert len(dir_contents) == 2 assert "foo" in dir_contents assert "bar" in dir_contents with raises(IOError): sftp.listdir("/123_no_dir")
def _search_for_model(names, workdir): """ Check if a model file is available. :param names: list of model names to be searcher for, highest ranking first :type names: str :param workdir: the workdir where the model file is located :type name: str :type workdir: str :returns: the path to the model file or None if model file not found """ for name in names: path = os.path.join(workdir, name) if os.access(path, os.F_OK): return path return None
def test_generateScriptEnv(self): filename = "/tmp/test.sh" testFile = "/tmp/test.txt" cmds = """ X="${ENV_TEST}" touch "${X}" exit 0 """ generateScript(filename, cmds, {"ENV_TEST": testFile}) # check permissions self.assertTrue(os.access(filename, os.F_OK)) self.assertTrue(os.access(filename, os.X_OK | os.R_OK)) # execute the script and expect it to create the file status = subprocess.call([filename]) self.assertEqual(status, 0) # check that the testFile got created by the script self.assertTrue(os.access(testFile, os.F_OK)) # cleanup os.remove(filename) os.remove(testFile)
def execute(self): from ranger.container.file import File from os import access new_name = self.rest(1) tagged = {} old_name = self.fm.thisfile.relative_path for f in self.fm.tags.tags: if str(f).startswith(self.fm.thisfile.path): tagged[f] = self.fm.tags.tags[f] self.fm.tags.remove(f) if not new_name: return self.fm.notify('Syntax: rename <newname>', bad=True) if new_name == old_name: return if access(new_name, os.F_OK): return self.fm.notify("Can't rename: file already exists!", bad=True) if self.fm.rename(self.fm.thisfile, new_name): f = File(new_name) # Update bookmarks that were pointing on the previous name obsoletebookmarks = [b for b in self.fm.bookmarks if b[1].path == self.fm.thisfile] if obsoletebookmarks: for key, _ in obsoletebookmarks: self.fm.bookmarks[key] = f self.fm.bookmarks.update_if_outdated() self.fm.thisdir.pointed_obj = f self.fm.thisfile = f for t in tagged: self.fm.tags.tags[t.replace(old_name, new_name)] = tagged[t] self.fm.tags.dump()
def _select_config_file_path(): """ Return an openATTIC configuration pathname """ possible_paths = ("/etc/default/openattic", "/etc/sysconfig/openattic") for path in possible_paths: if os.access(path, os.F_OK) and os.access(path, os.R_OK | os.W_OK): return path raise CommandExecutionError( ("No openATTIC config file found in the following locations: " "{}".format(possible_paths)))
def init(config, directory, template='generic'): """Initialize a new scripts directory.""" if os.access(directory, os.F_OK): raise util.CommandError("Directory %s already exists" % directory) template_dir = os.path.join(config.get_template_directory(), template) if not os.access(template_dir, os.F_OK): raise util.CommandError("No such template %r" % template) util.status("Creating directory %s" % os.path.abspath(directory), os.makedirs, directory) versions = os.path.join(directory, 'versions') util.status("Creating directory %s" % os.path.abspath(versions), os.makedirs, versions) script = ScriptDirectory(directory) for file_ in os.listdir(template_dir): file_path = os.path.join(template_dir, file_) if file_ == 'alembic.ini.mako': config_file = os.path.abspath(config.config_file_name) if os.access(config_file, os.F_OK): util.msg("File %s already exists, skipping" % config_file) else: script._generate_template( file_path, config_file, script_location=directory ) elif os.path.isfile(file_path): output_file = os.path.join(directory, file_) script._copy_file( file_path, output_file ) util.msg("Please edit configuration/connection/logging " "settings in %r before proceeding." % config_file)