Python shutil 模块,which() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.which()。
def assert_installed(win=None, modules=[], binaries=[]):
missing = defaultdict(list)
for items, what, func in ((modules, 'modules', find_spec),
(binaries, 'binaries', which)):
for item in items:
if not func(item):
missing[what].append(item)
if missing:
msg = []
for subject, names in missing.items():
if len(names) == 1:
subject = {'modules': 'module', 'binaries': 'binary'}[subject]
msg.append('%s "%s"' % (subject, '", "'.join(names)))
msg = 'You are missing the %s for this.' % ' and '.join(msg)
if win:
from PyQt5.QtWidgets import QMessageBox
QMessageBox.warning(win, ' ', msg)
else:
raise ImportError(msg)
return not missing
def get_media_info(self, media_file):
try:
ffprobe = shutil.which('ffprobe')
except Exception:
# python2
from distutils.spawn import find_executable
ffprobe = find_executable('ffprobe')
if ffprobe:
cmd = "ffprobe -v error -select_streams v -show_entries stream=width,height,avg_frame_rate,duration -of default=noprint_wrappers=1 -print_format json %s" % media_file
result = subprocess.check_output(cmd.split(' '), universal_newlines=True)
vjres = json.loads(result)['streams'][0]
if not vjres.get('duration'):
cmd = "ffprobe -v error -select_streams v -show_format_entry duration -of default=noprint_wrappers=1 -print_format json %s" % media_file
result = subprocess.check_output(cmd.split(' '), universal_newlines=True)
vjres['duration'] = json.loads(result)['format']['duration']
cmd = "ffprobe -v error -select_streams a -show_entries stream=sample_rate -of default=noprint_wrappers=1 -print_format json %s" % media_file
result = subprocess.check_output(cmd.split(' '), universal_newlines=True)
ajres = json.loads(result)['streams'][0]
vjres['sample_rate'] = ajres['sample_rate']
return vjres
else:
logger.error('ffprobe is required')
sys.exit()
def run_cmd(pl, cmd, stdin=None, strip=True):
'''Run command and return its stdout, stripped
If running command fails returns None and logs failure to ``pl`` argument.
:param PowerlineLogger pl:
Logger used to log failures.
:param list cmd:
Command which will be run.
:param str stdin:
String passed to command. May be None.
:param bool strip:
True if the result should be stripped.
'''
try:
p = Popen(cmd, shell=False, stdout=PIPE, stdin=PIPE)
except OSError as e:
pl.exception('Could not execute command ({0}): {1}', e, cmd)
return None
else:
stdout, err = p.communicate(
stdin if stdin is None else stdin.encode(get_preferred_output_encoding()))
stdout = stdout.decode(get_preferred_input_encoding())
return stdout.strip() if strip else stdout
def createProject( self, project ):
tm = self.table_view.table_model
self.all_visible_table_columns = (tm.col_status, tm.col_name, tm.col_date)
if shutil.which( hglib.HGPATH ) is None:
self.log.error( T_('Mercurial "hg" command line tool not found') )
return None
try:
return wb_hg_project.HgProject( self.app, project, self )
except hglib.error.ServerError as e:
self.app.log.error( T_('Failed to add Hg repo %s') % (project.path,) )
self.app.log.error( T_('hg error: %s') % (e,) )
return None
#------------------------------------------------------------
def find_editor():
for var in 'GIT_EDITOR', 'EDITOR':
editor = os.environ.get(var)
if editor is not None:
return editor
if sys.platform == 'win32':
fallbacks = ['notepad.exe']
else:
fallbacks = ['/etc/alternatives/editor', 'nano']
for fallback in fallbacks:
if os.path.isabs(fallback):
found_path = fallback
else:
found_path = shutil.which(fallback)
if found_path and os.path.exists(found_path):
return found_path
error('Could not find an editor! Set the EDITOR environment variable.')
def check_executable(executable):
"""
Checks if an executable exists in $PATH.
Arguments:
executable -- the name of the executable (e.g. "echo")
Returns:
True or False
"""
logger = logging.getLogger(__name__)
logger.debug("Checking executable '%s'...", executable)
executable_path = find_executable(executable)
found = executable_path is not None
if found:
logger.debug("Executable '%s' found: '%s'", executable,
executable_path)
else:
logger.debug("Executable '%s' not found", executable)
return found
def _get_chromedriver_path() -> str:
"""Get path to chromedriver executable.
Usually it is on the project root.
"""
chromedriver_path = shutil.which('chromedriver')
if chromedriver_path:
return chromedriver_path
if 'TRAVIS' in os.environ:
chromedriver_path = os.path.join(
os.environ['TRAVIS_BUILD_DIR'], 'chromedriver')
else:
chromedriver_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'chromedriver'
)
return chromedriver_path
# see https://www.spirulasystems.com/blog/2016/08/11/https-everywhere-unit-testing-for-chromium/ # noqa: E501
def updateCheck():
from pathlib import Path
if not shutil.which("git"):
return
# check if new commits are available
projectDir = str(Path(__file__).parent)
subprocess.call(["git", "fetch"], cwd=projectDir)
output = subprocess.check_output(["git", "status", "-uno"], cwd=projectDir)
behindIndex = output.find(b"Your branch is behind ")
if behindIndex > 0:
msgEnd = output.find(b"\n (use \"git pull\" to update your local branch)")
if msgEnd > 0:
output = output[behindIndex:msgEnd]
statusUpdate("Current CheriBuild checkout can be updated: ", output.decode("utf-8"))
if input("Would you like to update before continuing? y/[n] (Enter to skip) ").lower().startswith("y"):
subprocess.check_call(["git", "pull", "--rebase"], cwd=projectDir)
os.execv(sys.argv[0], sys.argv)
def extract_sdk_archives(cheriConfig, archives: "typing.List[SdkArchive]"):
if cheriConfig.sdkBinDir.is_dir():
statusUpdate(cheriConfig.sdkBinDir, "already exists, not extracting SDK archives")
return
cheriConfig.FS.makedirs(cheriConfig.sdkDir)
for archive in archives:
archive.extract()
if not cheriConfig.sdkBinDir.exists():
fatalError("SDK bin dir does not exist after extracting sysroot archives!")
# Use the host ar/ranlib if they are missing
for tool in ("ar", "ranlib"):
if not (cheriConfig.sdkDir / "bin" / tool).exists():
cheriConfig.FS.createSymlink(Path(shutil.which(tool)), cheriConfig.sdkBinDir / tool, relative=False)
cheriConfig.FS.createBuildtoolTargetSymlinks(cheriConfig.sdkBinDir / tool)
def __init__(self, config: CheriConfig):
super().__init__(config)
# set up the install/build/source directories (allowing overrides from config file)
self.configureCommand = ""
# non-assignable variables:
self.make_args = MakeOptions()
self.configureArgs = [] # type: typing.List[str]
self.configureEnvironment = {} # type: typing.Dict[str,str]
if self.config.createCompilationDB and self.compileDBRequiresBear:
self._addRequiredSystemTool("bear", installInstructions="Run `cheribuild.py bear`")
self._lastStdoutLineCanBeOverwritten = False
self._preventAssign = True
if self.requiresGNUMake:
if IS_LINUX and not shutil.which("gmake"):
statusUpdate("Could not find `gmake` command, assuming `make` is GNU make")
self.makeCommand = "make"
else:
self._addRequiredSystemTool("gmake", homebrewPackage="make")
self.makeCommand = "gmake"
else:
self.makeCommand = "make"
# Make sure that API is used properly
def getInterpreter(cmdline: "typing.Sequence[str]") -> "typing.Optional[typing.List[str]]":
"""
:param cmdline: The command to check
:return: The interpreter command if the executable does not have execute permissions
"""
executable = Path(cmdline[0])
print(executable, os.access(str(executable), os.X_OK), cmdline)
if not executable.exists():
executable = Path(shutil.which(str(executable)))
statusUpdate(executable, "is not executable, looking for shebang:", end=" ")
with executable.open("r", encoding="utf-8") as f:
firstLine = f.readline()
if firstLine.startswith("#!"):
interpreter = shlex.split(firstLine[2:])
statusUpdate("Will run", executable, "using", interpreter)
return interpreter
else:
statusUpdate("No shebang found.")
return None
def require_command(
runner: Runner, command: str, message: Optional[str] = None
):
if message is None:
message = "Please install " + command
try:
runner.get_output(["which", command])
except CalledProcessError as e:
sys.stderr.write(message + "\n")
sys.stderr.write(
'(Ran "which {}" to check in your $PATH.)\n'.format(command)
)
sys.stderr.write(
"See the documentation at https://telepresence.io "
"for more details.\n"
)
raise SystemExit(1)
def kubectl_or_oc(server: str) -> str:
"""
Return "kubectl" or "oc", the command-line tool we should use.
:param server: The URL of the cluster API server.
"""
if which("oc") is None:
return "kubectl"
# We've got oc, and possibly kubectl as well. We only want oc for OpenShift
# servers, so check for an OpenShift API endpoint:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
with urlopen(server + "/version/openshift", context=ctx) as u:
u.read()
except HTTPError:
return "kubectl"
else:
return "oc"
def get_env_variables(runner: Runner, remote_info: RemoteInfo,
context: str) -> Dict[str, str]:
"""
Generate environment variables that match kubernetes.
"""
# Get the environment:
remote_env = _get_remote_env(
runner, context, remote_info.namespace, remote_info.pod_name,
remote_info.container_name
)
# Tell local process about the remote setup, useful for testing and
# debugging:
result = {
"TELEPRESENCE_POD": remote_info.pod_name,
"TELEPRESENCE_CONTAINER": remote_info.container_name
}
# Alpine, which we use for telepresence-k8s image, automatically sets these
# HOME, PATH, HOSTNAME. The rest are from Kubernetes:
for key in ("HOME", "PATH", "HOSTNAME"):
if key in remote_env:
del remote_env[key]
result.update(remote_env)
return result
def setUpClass(self):
# Getting pass-import module
super(TestPass, self).setUpClass()
# Pass binary
os.environ['PASSWORD_STORE_BIN'] = shutil.which("pass")
# GPG Config
if 'GPG_AGENT_INFO' in os.environ:
os.environ.pop('GPG_AGENT_INFO', None)
os.environ['GNUPGHOME'] = os.path.join(os.getcwd(), 'gnupg')
# Tests directories
self.tmp = os.path.join(self.tmp, self.__name__[8:].lower())
shutil.rmtree(self.tmp, ignore_errors=True)
os.makedirs(self.tmp, exist_ok=True)
def generate_iso(self, cleanup=True):
self.cdrom_iso_tmp = NamedTemporaryFile(delete=False, dir=self.tmp_dir.name)
cdrom_iso = self.cdrom_iso_tmp.name
# chmod to be r/w by everyone
# so we can remove the file even when qemu takes the ownership
tools = {
"genisoimage": self.__genisoimage,
"mkisofs": self.__mkisofs
}
available = next(bin for bin in tools.keys()
if shutil.which(bin) is not None)
# generate iso
if available is None:
raise Exception('Cannot find tools for creating ISO images')
tools[available](cdrom_iso)
logging.debug('ISO generated at %s', cdrom_iso)
# cleanup
if cleanup:
self.cdrom_dir_tmp.cleanup()
return cdrom_iso
def assemble(outfile, compiler, toolchain, flags=()):
"""Assembles the solver. Returns the filename that was generated."""
print('Assembling ' + toolchain)
base, _ = os.path.splitext(outfile)
asmfile = base + '-' + toolchain.lower() + '.s'
prefix = os.path.dirname(os.path.dirname(shutil.which(compiler)))
include = os.path.join(prefix, 'include')
cmd = [compiler, '-I' + include, '-fPIC', '-O0']
cmd.extend(flags)
cmd.extend(['-S', '-o', asmfile, '-c', outfile])
print('Running command:\n $ ' + ' '.join(cmd))
t0 = time.time()
subprocess.check_call(cmd)
t1 = time.time()
print('{0} assembled in {1:.3} seconds'.format(toolchain, t1 - t0))
return asmfile
def run_pandoc(text='', args=None):
"""
Low level function that calls Pandoc with (optionally)
some input text and/or arguments
"""
if args is None:
args = []
pandoc_path = which('pandoc')
if pandoc_path is None or not os.path.exists(pandoc_path):
raise OSError("Path to pandoc executable does not exists")
proc = Popen([pandoc_path] + args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
out, err = proc.communicate(input=text.encode('utf-8'))
exitcode = proc.returncode
if exitcode != 0:
raise IOError(err)
return out.decode('utf-8')
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
def check_executable(executable: str):
location = shutil.which(executable)
if location is None:
print('`{0}` is not found on your PATH.\n'
'Make sure that `{0}` is installed on your system '
'and available on the PATH.'.format(executable))
sys.exit(1)
else:
pass
def extract_zip(data: bytes, path: Path) -> None:
"""Extract zipped data to path."""
# On mac zipfile module cannot extract correctly, so use unzip instead.
if curret_platform() == 'mac':
import subprocess
import shutil
zip_path = path / 'chrome.zip'
if not path.exists():
path.mkdir(parents=True)
with zip_path.open('wb') as f:
f.write(data)
if not shutil.which('unzip'):
raise OSError('Failed to automatically extract chrome.zip.'
f'Please unzip {zip_path} manually.')
subprocess.run(['unzip', str(zip_path)], cwd=str(path))
if chromium_excutable().exists() and zip_path.exists():
zip_path.unlink()
else:
with ZipFile(BytesIO(data)) as zf:
zf.extractall(str(path))
exec_path = chromium_excutable()
if not exec_path.exists():
raise IOError('Failed to extract chromium.')
exec_path.chmod(exec_path.stat().st_mode | stat.S_IXOTH | stat.S_IXGRP |
stat.S_IXUSR)
logger.warning(f'chromium extracted to: {path}')
def und_tool_exists() -> bool:
return shutil.which('und') is not None
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
def as_tuple(self, value):
"""Utility function which converts lists to tuples."""
if isinstance(value, list):
value = tuple(value)
return value
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.