Python subprocess 模块,STARTUPINFO 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.STARTUPINFO。
def runCmd(cmd,cmd_timeout=300):
''' run command without showing console window on windows - return stdout and stderr as strings '''
startupinfo = None
output = ""
output_err = ""
debug_log("runCmd: {}".format(cmd))
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
try:
proc = subprocess.Popen(cmd,bufsize=-1,startupinfo=startupinfo,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None,shell=False,universal_newlines=False)
except SubprocessError as e:
proc = None
debug_log("exception in runCmd: {}".format(e),logging.ERROR)
if proc is not None:
try:
outputb, output_errb = proc.communicate()
output = outputb.decode('utf-8','replace')
output_err = output_errb.decode('utf-8','replace')
except subprocess.TimeoutExpired(timeout=cmd_timeout):
proc.kill()
debug_log("runCmd: Process killed due to timeout",logging.WARNING)
else:
debug_log("runCmd: Proc was none",logging.WARNING)
return output,output_err
def run_spotty(self, arguments=None):
'''On supported platforms we include spotty binary'''
try:
args = [
self.__spotty_binary,
"-u", self.username,
"-p", self.password,
"--disable-discovery" # for now, disable dns discovery
]
if arguments:
args += arguments
if not "-n" in args:
args += ["-n", self.playername]
# if self.__cache_path:
# args += ["-c", self.__cache_path, "--enable-audio-cache"]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(args, startupinfo=startupinfo, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=0)
except Exception as exc:
log_exception(__name__, exc)
return None
def run_command(self):
args = ['sfdx', 'force:apex:test:run', '-r', 'human',
'-l', 'RunSpecifiedTests', '-n', self.class_name]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if err is None or err == '':
printErr = out
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:apex:test:run', '-r', 'human']
if not self.test_org is None and len(self.test_org) > 0:
args.push('-u')
args.push(self.input)
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if err is None or err == '':
printErr = out
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:source:push']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if not err is None and not err == '':
printErr = out
else:
printer.write('\nError pushing source')
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:source:pull']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if not err is None and not err == '':
printErr = out
else:
printer.write('\nError pulling source')
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:org:create', '-f',
self.def_file, '-a', 'ScratchOrg', '-s']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nScratch org created')
else:
printer.write('\nError creating scratch org')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:auth:web:login', '-d', '-s', '-a', 'DevHub']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nDevHub authorized')
else:
printer.write('\nError authorizing Dev Hub:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:visualforce:component:create',
'-n', self.page_name,'-l', self.page_label, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nVisaulforce Component created')
file = os.path.join(self.class_dir, self.page_name + '.component')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Visualforce Component:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:visualforce:page:create',
'-n', self.page_name,'-l', self.page_label, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nVisaulforce page created')
file = os.path.join(self.class_dir, self.page_name + '.page')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Visualforce page:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:component:create',
'-n', self.cmp_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Component created')
file = os.path.join(self.class_dir, self.cmp_name, self.cmp_name + '.cmp')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Component:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:test:create',
'-n', self.event_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Test created')
file = os.path.join(self.class_dir, self.event_name + '.resource')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Test:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:interface:create',
'-n', self.event_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Interface created')
file = os.path.join(self.class_dir, self.event_name, self.event_name + '.intf')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Interface:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:event:create',
'-n', self.event_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Event created')
file = os.path.join(self.class_dir, self.event_name, self.event_name + '.evt')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Event:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:app:create',
'-n', self.app_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning App created')
file = os.path.join(self.class_dir, self.app_name, self.app_name + '.app')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning App:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:project:upgrade', '-f']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nProject upgraded')
else:
printer.write('\nError upgrading project:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:project:create', '-n', self.project_name,
'-t', self.template, '-d', self.project_path]
if self.namespace is not None and self.namespace != '':
args.push('-s')
args.push(self.namespace)
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, startupinfo=startupinfo)
p.wait()
out,err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nProject created')
else:
printer.write('\nError creating project:')
printer.write('\n' + str(out, 'UTF-8'))
def run_command(self):
args = ['sfdx', 'force:apex:execute', '-f', self.file_path]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nFinished running apex')
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if err is None or err == '':
printErr = out
printer.write('\nError running apex')
printer.write('\n' + str(printErr, 'utf-8'))
def prepare_subprocess_args(self):
popen_args = {
'args': self.get_php_cmd(),
'env': self.get_env(),
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
}
# Prevent cmd.exe window popup on Windows.
if is_windows():
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
popen_args['startupinfo'] = startupinfo
return popen_args
def startBbcomm():
CREATE_NEW_CONSOLE = 0x00000010
try:
info = subprocess.STARTUPINFO()
info.dwFlags = 1
info.wShowWindow = 0
subprocess.Popen(["c:/blp/api/bbcomm.exe"],
creationflags=CREATE_NEW_CONSOLE,
startupinfo=info)
except FileNotFoundError:
try:
info = subprocess.STARTUPINFO()
info.dwFlags = 1
info.wShowWindow = 0
subprocess.Popen(["c:/blp/dapi/bbcomm.exe"],
creationflags=CREATE_NEW_CONSOLE,
startupinfo=info)
except FileNotFoundError:
pass
def __init__(self, commands, context, cwd):
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.__proc = subprocess.Popen(commands,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
cwd=cwd)
self.__eof = False
self.__context = context
self.__queue_out = Queue()
self.__thread = Thread(target=self.enqueue_output)
self.__thread.start()
def start_server(server_binary_args, working_dir, env):
debug("starting " + str(server_binary_args))
si = None
if os.name == "nt":
si = subprocess.STARTUPINFO() # type: ignore
si.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW # type: ignore
try:
process = subprocess.Popen(
server_binary_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=working_dir,
env=env,
startupinfo=si)
return Client(process, working_dir)
except Exception as err:
sublime.status_message("Failed to start LSP server {}".format(str(server_binary_args)))
exception_log("Failed to start server", err)
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
def spawn(args, **kwargs):
"""Spawn a subprocess and return it back
"""
if 'cwd' not in kwargs:
kwargs['cwd'] = os.path.dirname(os.path.abspath(__file__))
kwargs['bufsize'] = -1
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
try:
return subprocess.Popen(args, **kwargs)
except Exception as error:
msg = (
'Your operating system denied the spawn of {0} process: {1}'
).format(args[0], error)
logging.error(msg)
raise RuntimeError(msg)
def start(self):
WorkerClient.stop_worker = False
node_path = global_vars.get_node_path()
if os.name == "nt":
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW
self.server_proc = subprocess.Popen(
[node_path, self.script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, startupinfo=si
)
else:
self.server_proc = subprocess.Popen(
[node_path, self.script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# start reader thread
if self.server_proc and (not self.server_proc.poll()):
log.debug("worker proc " + str(self.server_proc))
log.debug("starting worker thread")
workerThread = threading.Thread(target=WorkerClient.__reader, args=(
self.server_proc.stdout, self.msgq, self.eventq, self.asyncReq, self.server_proc, self.event_handlers))
workerThread.daemon = True
workerThread.start()
def subprocess_args(include_stdout=True):
if hasattr(subprocess, 'STARTUPINFO'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
env = os.environ
else:
si = None
env = None
if include_stdout:
ret = {'stdout:': subprocess.PIPE}
else:
ret = {}
ret.update({'stdin': subprocess.PIPE,
'stderr': subprocess.PIPE,
'startupinfo': si,
'env': env })
return ret
#Function to get the Process ID
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
try:
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
except Exception as exception:
print(exception.output)
def __init__(self, suproc_command, stdin_queue, stdout_queue, parent):
threading.Thread.__init__(self)
self.setDaemon(False) # we want it to survive parent's death so it can detect innactivity and terminate subproccess
self.setName('pjon_piper_thd')
self._subproc_command = suproc_command
self._birthtime = None
self._stopped = False
self._start_failed = False
self._pipe = None
self._stdout_queue = stdout_queue
self._stdin_queue = stdin_queue
self._parent = parent
if sys.platform == 'win32':
self._startupinfo = subprocess.STARTUPINFO()
self._startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._startupinfo.wShowWindow = subprocess.SW_HIDE
self.log = logging.getLogger(self.name)
self.log.handlers = []
self.log.addHandler(logging.NullHandler())
#self.log.propagate = False
self.log.setLevel(logging.INFO)
def get_startup_info():
# Hide the child process window.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return startupinfo
def executeCmd(cmd):
command=['cmd.exe', '/c'] + cmd.split()
res = subprocess.check_output(command, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, universal_newlines=True)
# info=subprocess.STARTUPINFO()
# info.dwFlags=subprocess.STARTF_USESHOWWINDOW | subprocess.CREATE_NEW_PROCESS_GROUP
# info.wShowWindow=subprocess.SW_HIDE
# p=subprocess.Popen(command, startupinfo=info, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, universal_newlines=True)
# results, _=p.communicate()
return res
def start_hidden_process(path):
info = subprocess.STARTUPINFO()
info.dwFlags = subprocess.STARTF_USESHOWWINDOW|subprocess.CREATE_NEW_PROCESS_GROUP
info.wShowWindow = subprocess.SW_HIDE
p=subprocess.Popen(path, startupinfo=info)
return p
def kill_spotty():
'''make sure we don't have any (remaining) spotty processes running before we start one'''
if xbmc.getCondVisibility("System.Platform.Windows"):
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW
subprocess.Popen(["taskkill", "/IM", "spotty.exe"], startupinfo=startupinfo, shell=True)
else:
os.system("killall spotty")
def test_spotty(self, binary_path):
'''self-test spotty binary'''
try:
st = os.stat(binary_path)
os.chmod(binary_path, st.st_mode | stat.S_IEXEC)
args = [
binary_path,
"-n", "selftest",
"-x", "--disable-discovery"
]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW
spotty = subprocess.Popen(
args,
startupinfo=startupinfo,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0)
stdout, stderr = spotty.communicate()
log_msg(stdout)
if "ok spotty" in stdout:
return True
elif xbmc.getCondVisibility("System.Platform.Windows"):
log_msg("Unable to initialize spotty binary for playback."
"Make sure you have the VC++ 2015 runtime installed.", xbmc.LOGERROR)
except Exception as exc:
log_exception(__name__, exc)
return False
def start_hidden_process(path):
info = subprocess.STARTUPINFO()
info.dwFlags = subprocess.STARTF_USESHOWWINDOW|subprocess.CREATE_NEW_PROCESS_GROUP
info.wShowWindow = subprocess.SW_HIDE
p=subprocess.Popen(path, startupinfo=info)
return p
def pipe_through_prog(cmd, path=None, stdin=''):
"""Run the Vale binary with the given command.
"""
startupinfo = None
if sublime.platform() == 'windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(cmd, cwd=path, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
startupinfo=startupinfo)
out, err = p.communicate(input=stdin.encode('utf-8'))
return out.decode('utf-8'), err
def __init__(self, cmd, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, cwd=None, env=None, encoding=None,
codec_errors='strict'):
super(PopenSpawn, self).__init__(timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, logfile=logfile,
encoding=encoding, codec_errors=codec_errors)
kwargs = dict(bufsize=0, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
cwd=cwd, env=env)
if sys.platform == 'win32':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
if not isinstance(cmd, (list, tuple)):
cmd = shlex.split(cmd)
self.proc = subprocess.Popen(cmd, **kwargs)
self.closed = False
self._buf = self.string_type()
self._read_queue = Queue()
self._read_thread = threading.Thread(target=self._read_incoming)
self._read_thread.setDaemon(True)
self._read_thread.start()
def _invoke(self, cmdline):
if sys.platform[:3] == 'win':
closefds = False
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
else:
closefds = True
startupinfo = None
if (os.environ.get('DISPLAY') or sys.platform[:3] == 'win' or
sys.platform == 'darwin'):
inout = file(os.devnull, 'r+')
else:
# for TTY programs, we need stdin/out
inout = None
# if possible, put the child precess in separate process group,
# so keyboard interrupts don't affect child precess as well as
# Python
setsid = getattr(os, 'setsid', None)
if not setsid:
setsid = getattr(os, 'setpgrp', None)
pipe = subprocess.Popen(cmdline, stdin=inout, stdout=inout,
stderr=inout, close_fds=closefds,
preexec_fn=setsid, startupinfo=startupinfo)
# It is assumed that this kind of tools (gnome-open, kfmclient,
# exo-open, xdg-open and open for OSX) immediately exit after lauching
# the specific application
returncode = pipe.wait()
if hasattr(self, 'fixreturncode'):
returncode = self.fixreturncode(returncode)
return not returncode
def start_server():
deleteDbIfExists()
working_dir = os.path.join(util.get_plugin_folder(), 'apex-jorje-lsp.jar')
java_cmd = 'java'
java_path = util.get_setting('java_path')
util.debug(java_path)
if java_path != '':
java_cmd = os.path.join(java_path, java_cmd)
util.debug('using java path: ', java_cmd)
args = [java_cmd, '-cp', working_dir, '-Ddebug.internal.errors=true','-Ddebug.semantic.errors=false',
'apex.jorje.lsp.ApexLanguageServerLauncher']
util.debug("starting " + str(args))
si = None
if os.name == "nt":
si = subprocess.STARTUPINFO() # type: ignore
si.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW # type: ignore
try:
process = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=util.dxProjectFolder(),
startupinfo=si)
return Client(process)
except Exception as err:
util.debug(err)
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:data:soql:query',
'-q', self.query]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nOpening results file')
content = str(out,'UTF-8')
#try:
# parsed = json.loads(content)
# content = json.dumps(parsed, sort_keys=True,indent=1, separators=(',', ':'))
# util.debug(content)
#except Exception as e:
# util.debug('could not format query results\n', e)
file = sublime.active_window().new_file()
file.set_scratch(True)
file.set_name('SOQL')
syntax_path = None
if "linux" in sys.platform or "darwin" in sys.platform:
syntax_path = os.path.join("Packages",plugin_name(),"sublime","lang","JSON.tmLanguage")
else:
syntax_path = os.path.join("Packages/"+plugin_name()+"/sublime/lang/JSON.tmLanguage")
#file.set_syntax_file(syntax_path)
file.run_command("insert", {"characters":content})
else:
printer.write('\nError running query:')
printer.write('\n' + str(err, 'utf-8'))
def _call_proc(*proc_args):
starti = subprocess.STARTUPINFO()
starti.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.call(proc_args, startupinfo=starti,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE)
def __init__( self, path, rar_exe_path ):
self.path = path
self.rar_exe_path = rar_exe_path
if RarArchiver.devnull is None:
RarArchiver.devnull = open(os.devnull, "w")
# windows only, keeps the cmd.exe from popping up
if platform.system() == "Windows":
self.startupinfo = subprocess.STARTUPINFO()
self.startupinfo.dwFlags |= _subprocess.STARTF_USESHOWWINDOW
else:
self.startupinfo = None
def node_bridge(data, bin, args=[]):
env = None
startupinfo = None
if os_name == 'osx':
# GUI apps in OS X doesn't contain .bashrc/.zshrc set paths
env = os.environ.copy()
env['PATH'] += ':/usr/local/bin'
elif os_name == 'windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
try:
p = subprocess.Popen(
['node', bin] + args,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
startupinfo=startupinfo
)
except OSError:
raise Exception('Error: Couldn\'t find "node" in "%s"' % env['PATH'])
stdout, stderr = p.communicate(input=data.encode('utf-8'))
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
if stderr:
raise Exception('Error: %s' % stderr)
else:
return stdout
def win_dythread(dyalog, cygwin=False):
startupinfo = None
preexec_fn = None
if not cygwin:
# not cygwin
# hide the window
# imported here because STARTUPINFO only exists on Windows
import subprocess
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwflags = subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0
elif cygwin:
# cygwin: we need to setpgrp like on Linux or Dyalog will crash
preexec_fn = os.setpgrp
path=to_bytes(os.path.dirname(SCRIPTFILE))+b'/WinPySlave.dyapp'
if cygwin: path=cyg_convert_path(path, b"--windows")
dyalog = pystr(dyalog)
arg = pystr(b'DYAPP=' + path)
x=Popen([dyalog, arg], startupinfo=startupinfo,
preexec_fn=preexec_fn)
x.communicate()
def start_hidden_process(path):
info = subprocess.STARTUPINFO()
info.dwFlags = subprocess.STARTF_USESHOWWINDOW|subprocess.CREATE_NEW_PROCESS_GROUP
info.wShowWindow = subprocess.SW_HIDE
p=subprocess.Popen(path, startupinfo=info)
return p
def __init__(self, cmd, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, cwd=None, env=None, encoding=None,
codec_errors='strict'):
super(PopenSpawn, self).__init__(timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, logfile=logfile,
encoding=encoding, codec_errors=codec_errors)
kwargs = dict(bufsize=0, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
cwd=cwd, env=env)
if sys.platform == 'win32':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
if not isinstance(cmd, (list, tuple)):
cmd = shlex.split(cmd)
self.proc = subprocess.Popen(cmd, **kwargs)
self.closed = False
self._buf = self.string_type()
self._read_queue = Queue()
self._read_thread = threading.Thread(target=self._read_incoming)
self._read_thread.setDaemon(True)
self._read_thread.start()
def process_startup_info():
if not is_windows():
return None
startupinfo = sub.STARTUPINFO()
startupinfo.dwFlags |= sub.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = sub.SW_HIDE
return startupinfo
def test_startupinfo(self):
# startupinfo argument
# We uses hardcoded constants, because we do not want to
# depend on win32all.
STARTF_USESHOWWINDOW = 1
SW_MAXIMIZE = 3
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags = STARTF_USESHOWWINDOW
startupinfo.wShowWindow = SW_MAXIMIZE
# Since Python is a console process, it won't be affected
# by wShowWindow, but the argument should be silently
# ignored
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
startupinfo=startupinfo)
def start_server(self):
if self.android is None:
if self.enabled and self.lang.voice is not None:
# voices = ["-s 190 -a 100 -p 75 -ven+m1 ", "-s 170 -a 100 -p 80 -ven+m2 ","-s 175 -a 100 -p 80 -ven+m3 ","-s 190 -a 100 -p 60 -ven+f1 ","-s 170 -a 100 -p 75 -ven+f2 ","-s 170 -a 100 -p 80 -ven+m2 "]
cmd = ['espeak']
cmd.extend(self.lang.voice)
try:
# IS_WIN32 = 'win32' in str(sys.platform).lower() #maybe sys.platform is more secure
is_win = platform.system() == "Windows"
if is_win:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags = subprocess.CREATE_NEW_CONSOLE | subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
kwargs = {}
kwargs['startupinfo'] = startupinfo
# self.process = subprocess.Popen(cmd, shell=True, bufsize=0, close_fds=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
self.process = subprocess.Popen(cmd, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo)
else:
self.process = subprocess.Popen(cmd, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.started = True
except:
self.enabled = False
self.started = False
print(
"eduActiv8: You may like to install espeak to get some extra functionality, however this is not required to successfully use the game.")
# stdout and stderr only used to hide the messages from terminal
else:
self.process = None
def play_mediafile(typeid, filename):
global _mediaplayer
if sys.platform == 'win32' and typeid == 'wav':
winsound.PlaySound(filename, winsound.SND_FILENAME | winsound.SND_ASYNC)
else:
args = [cfg.get_string("sound/%s_player" % typeid)]
# We only add %s_player_options if is is a non empty string,
# since args should not contain any empty strings.
if cfg.get_string("sound/%s_player_options"% typeid):
args.extend(
cfg.get_string("sound/%s_player_options"% typeid).split(" "))
found = False
for i, s in enumerate(args):
if '%s' in s:
args[i] = args[i] % os.path.abspath(filename)
found = True
break
# Remove left over %s, just in case the user entered more than
# one in the preferences window.
args = [x for x in args if x != '%s']
if not found:
args.append(os.path.abspath(filename))
if _mediaplayer and _mediaplayer.poll() == None:
_mediaplayer.kill()
_mediaplaer = None
try:
if sys.platform == 'win32':
info = subprocess.STARTUPINFO()
info.dwFlags = 1
info.wShowWindow = 0
_mediaplayer = osutils.Popen(args=args, startupinfo=info)
else:
_mediaplayer = osutils.Popen(args=args)
except OSError, e:
raise osutils.BinaryForMediaPlayerException(typeid,
cfg.get_string("sound/%s_player" % typeid), e)