Python win32api 模块,CloseHandle() 实例源码
我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用win32api.CloseHandle()。
def GetDomainName():
try:
tok = win32security.OpenThreadToken(win32api.GetCurrentThread(),
TOKEN_QUERY, 1)
except win32api.error, details:
if details[0] != winerror.ERROR_NO_TOKEN:
raise
# attempt to open the process token, since no thread token
# exists
tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(),
TOKEN_QUERY)
sid, attr = win32security.GetTokenInformation(tok, TokenUser)
win32api.CloseHandle(tok)
name, dom, typ = win32security.LookupAccountSid(None, sid)
return dom
def run_elevated(command, args, wait=True):
"""Run the given command as an elevated user and wait for it to return"""
ret = 1
if command.find(' ') > -1:
command = '"' + command + '"'
if platform.system() == 'Windows':
import win32api
import win32con
import win32event
import win32process
from win32com.shell.shell import ShellExecuteEx
from win32com.shell import shellcon
logging.debug(command + ' ' + args)
process_info = ShellExecuteEx(nShow=win32con.SW_HIDE,
fMask=shellcon.SEE_MASK_NOCLOSEPROCESS,
lpVerb='runas',
lpFile=command,
lpParameters=args)
if wait:
win32event.WaitForSingleObject(process_info['hProcess'], 600000)
ret = win32process.GetExitCodeProcess(process_info['hProcess'])
win32api.CloseHandle(process_info['hProcess'])
else:
ret = process_info
else:
logging.debug('sudo ' + command + ' ' + args)
ret = subprocess.call('sudo ' + command + ' ' + args, shell=True)
return ret
def subprocess_terminate( proc ) :
try:
proc.terminate()
except AttributeError:
print " no terminate method to Popen.."
try:
import signal
os.kill( proc.pid , signal.SIGTERM)
except AttributeError:
print " no os.kill, using win32api.."
try:
import win32api
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess( PROCESS_TERMINATE, False, proc.pid)
win32api.TerminateProcess(handle,-1)
win32api.CloseHandle(handle)
except ImportError:
print " ERROR: could not terminate process."
def testCleanup1(self):
# We used to clobber all outstanding exceptions.
def f1(invalidate):
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
if invalidate:
win32api.CloseHandle(int(h))
1/0
# If we invalidated, then the object destruction code will attempt
# to close an invalid handle. We don't wan't an exception in
# this case
def f2(invalidate):
""" This function should throw an IOError. """
try:
f1(invalidate)
except ZeroDivisionError, exc:
raise IOError("raise 2")
self.assertRaises(IOError, f2, False)
# Now do it again, but so the auto object destruction
# actually fails.
self.assertRaises(IOError, f2, True)
def GetDomainName():
try:
tok = win32security.OpenThreadToken(win32api.GetCurrentThread(),
TOKEN_QUERY, 1)
except win32api.error as details:
if details[0] != winerror.ERROR_NO_TOKEN:
raise
# attempt to open the process token, since no thread token
# exists
tok = win32security.OpenProcessToken(win32api.GetCurrentProcess(),
TOKEN_QUERY)
sid, attr = win32security.GetTokenInformation(tok, TokenUser)
win32api.CloseHandle(tok)
name, dom, typ = win32security.LookupAccountSid(None, sid)
return dom
def testCleanup1(self):
# We used to clobber all outstanding exceptions.
def f1(invalidate):
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
if invalidate:
win32api.CloseHandle(int(h))
1/0
# If we invalidated, then the object destruction code will attempt
# to close an invalid handle. We don't wan't an exception in
# this case
def f2(invalidate):
""" This function should throw an IOError. """
try:
f1(invalidate)
except ZeroDivisionError as exc:
raise IOError("raise 2")
self.assertRaises(IOError, f2, False)
# Now do it again, but so the auto object destruction
# actually fails.
self.assertRaises(IOError, f2, True)
def _closeStdin(self):
if hasattr(self, "hChildStdinWr"):
win32file.CloseHandle(self.hChildStdinWr)
del self.hChildStdinWr
self.closingStdin = False
self.closedStdin = True
def closeStderr(self):
if hasattr(self, "hChildStderrRd"):
win32file.CloseHandle(self.hChildStderrRd)
del self.hChildStderrRd
self.closedStderr = True
self.connectionLostNotify()
def closeStdout(self):
if hasattr(self, "hChildStdoutRd"):
win32file.CloseHandle(self.hChildStdoutRd)
del self.hChildStdoutRd
self.closedStdout = True
self.connectionLostNotify()
def close(self):
try:
win32api.CloseHandle(self.pipe)
except pywintypes.error:
# You can't close std handles...?
pass
def writeConnectionLost(self):
self.deactivate()
try:
win32api.CloseHandle(self.writePipe)
except pywintypes.error:
# OMG what
pass
self.lostCallback()
def _closeStdin(self):
if hasattr(self, "hChildStdinWr"):
win32file.CloseHandle(self.hChildStdinWr)
del self.hChildStdinWr
self.closingStdin = False
self.closedStdin = True
def closeStderr(self):
if hasattr(self, "hChildStderrRd"):
win32file.CloseHandle(self.hChildStderrRd)
del self.hChildStderrRd
self.closedStderr = True
self.connectionLostNotify()
def closeStdout(self):
if hasattr(self, "hChildStdoutRd"):
win32file.CloseHandle(self.hChildStdoutRd)
del self.hChildStdoutRd
self.closedStdout = True
self.connectionLostNotify()
def close(self):
try:
win32api.CloseHandle(self.pipe)
except pywintypes.error:
# You can't close std handles...?
pass
def writeConnectionLost(self):
self.deactivate()
try:
win32api.CloseHandle(self.writePipe)
except pywintypes.error:
# OMG what
pass
self.lostCallback()
def wait_for_elevated_process(process_info):
if platform.system() == 'Windows' and 'hProcess' in process_info:
import win32api
import win32con
import win32event
import win32process
win32event.WaitForSingleObject(process_info['hProcess'], 600000)
ret = win32process.GetExitCodeProcess(process_info['hProcess'])
win32api.CloseHandle(process_info['hProcess'])
return ret
# pylint: enable=E0611,E0401
# pylint: disable=E1101
def __del__(self):
if self.locked:
self.release()
win32api.CloseHandle(self.handle)
def killProcName(procname):
# Change suggested by Dan Knierim, who found that this performed a
# "refresh", allowing us to kill processes created since this was run
# for the first time.
try:
win32pdhutil.GetPerformanceAttributes('Process','ID Process',procname)
except:
pass
pids = win32pdhutil.FindPerformanceAttributesByName(procname)
# If _my_ pid in there, remove it!
try:
pids.remove(win32api.GetCurrentProcessId())
except ValueError:
pass
if len(pids)==0:
result = "Can't find %s" % procname
elif len(pids)>1:
result = "Found too many %s's - pids=`%s`" % (procname,pids)
else:
handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0,pids[0])
win32api.TerminateProcess(handle,0)
win32api.CloseHandle(handle)
result = ""
return result
def testCleanup2(self):
# Cause an exception during object destruction.
# The worst this does is cause an ".XXX undetected error (why=3)"
# So avoiding that is the goal
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
# Close the handle underneath the object.
win32api.CloseHandle(int(h))
# Object destructor runs with the implicit close failing
h = None
def testCleanup3(self):
# And again with a class - no __del__
import win32event
class Test:
def __init__(self):
self.h = win32event.CreateEvent(None, 0, 0, None)
win32api.CloseHandle(int(self.h))
t=Test()
t = None
def _getInvalidHandleException(self):
try:
win32api.CloseHandle(1)
except win32api.error, exc:
return exc
self.fail("Didn't get invalid-handle exception.")
def testSimple(self):
self.assertRaises(pywintypes.error, win32api.CloseHandle, 1)
def testFuncIndex(self):
exc = self._getInvalidHandleException()
self._testExceptionIndex(exc, 1, "CloseHandle")
def testUnpack(self):
try:
win32api.CloseHandle(1)
self.fail("expected exception!")
except win32api.error, exc:
self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE)
self.failUnlessEqual(exc.funcname, "CloseHandle")
expected_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
self.failUnlessEqual(exc.strerror, expected_msg)
def testAsStr(self):
exc = self._getInvalidHandleException()
err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
# early on the result actually *was* a tuple - it must always look like one
err_tuple = (winerror.ERROR_INVALID_HANDLE, 'CloseHandle', err_msg)
self.failUnlessEqual(str(exc), str(err_tuple))
def testAttributes(self):
exc = self._getInvalidHandleException()
err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE)
self.failUnlessEqual(exc.strerror, err_msg)
self.failUnlessEqual(exc.funcname, 'CloseHandle')
# some tests for 'insane' args.
def killProcName(procname):
# Change suggested by Dan Knierim, who found that this performed a
# "refresh", allowing us to kill processes created since this was run
# for the first time.
try:
win32pdhutil.GetPerformanceAttributes('Process','ID Process',procname)
except:
pass
pids = win32pdhutil.FindPerformanceAttributesByName(procname)
# If _my_ pid in there, remove it!
try:
pids.remove(win32api.GetCurrentProcessId())
except ValueError:
pass
if len(pids)==0:
result = "Can't find %s" % procname
elif len(pids)>1:
result = "Found too many %s's - pids=`%s`" % (procname,pids)
else:
handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0,pids[0])
win32api.TerminateProcess(handle,0)
win32api.CloseHandle(handle)
result = ""
return result
def testCleanup2(self):
# Cause an exception during object destruction.
# The worst this does is cause an ".XXX undetected error (why=3)"
# So avoiding that is the goal
import win32event
h = win32event.CreateEvent(None, 0, 0, None)
# Close the handle underneath the object.
win32api.CloseHandle(int(h))
# Object destructor runs with the implicit close failing
h = None
def testCleanup3(self):
# And again with a class - no __del__
import win32event
class Test:
def __init__(self):
self.h = win32event.CreateEvent(None, 0, 0, None)
win32api.CloseHandle(int(self.h))
t=Test()
t = None
def _getInvalidHandleException(self):
try:
win32api.CloseHandle(1)
except win32api.error as exc:
return exc
self.fail("Didn't get invalid-handle exception.")
def testSimple(self):
self.assertRaises(pywintypes.error, win32api.CloseHandle, 1)
def testFuncIndex(self):
exc = self._getInvalidHandleException()
self._testExceptionIndex(exc, 1, "CloseHandle")
def testUnpack(self):
try:
win32api.CloseHandle(1)
self.fail("expected exception!")
except win32api.error as exc:
self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE)
self.failUnlessEqual(exc.funcname, "CloseHandle")
expected_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
self.failUnlessEqual(exc.strerror, expected_msg)
def testAsStr(self):
exc = self._getInvalidHandleException()
err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
# early on the result actually *was* a tuple - it must always look like one
err_tuple = (winerror.ERROR_INVALID_HANDLE, 'CloseHandle', err_msg)
self.failUnlessEqual(str(exc), str(err_tuple))
def testAttributes(self):
exc = self._getInvalidHandleException()
err_msg = win32api.FormatMessage(winerror.ERROR_INVALID_HANDLE).rstrip()
self.failUnlessEqual(exc.winerror, winerror.ERROR_INVALID_HANDLE)
self.failUnlessEqual(exc.strerror, err_msg)
self.failUnlessEqual(exc.funcname, 'CloseHandle')
# some tests for 'insane' args.
def logout(self, session):
"""Delete session of it exists."""
if self.is_session_valid(session):
sessionInfo = self.sessions[session]
win32api.CloseHandle(sessionInfo[3])
del self.sessions[session]
def init_acls():
# A process that tries to read or write a SACL needs
# to have and enable the SE_SECURITY_NAME privilege.
# And inorder to backup/restore, the SE_BACKUP_NAME and
# SE_RESTORE_NAME privileges are needed.
import win32api
try:
hnd = OpenProcessToken(win32api.GetCurrentProcess(),
TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY)
except win32api.error, exc:
log.Log("Warning: unable to open Windows process token: %s"
% exc, 5)
return
try:
try:
lpv = lambda priv: LookupPrivilegeValue(None, priv)
# enable the SE_*_NAME privileges
SecurityName = lpv(SE_SECURITY_NAME)
AdjustTokenPrivileges(hnd, False, [
(SecurityName, SE_PRIVILEGE_ENABLED),
(lpv(SE_BACKUP_NAME), SE_PRIVILEGE_ENABLED),
(lpv(SE_RESTORE_NAME), SE_PRIVILEGE_ENABLED)
])
except win32api.error, exc:
log.Log("Warning: unable to enable SE_*_NAME privileges: %s"
% exc, 5)
return
for name, enabled in GetTokenInformation(hnd, TokenPrivileges):
if name == SecurityName and enabled:
# now we *may* access the SACL (sigh)
ACL.flags |= SACL_SECURITY_INFORMATION
break
finally:
win32api.CloseHandle(hnd)
def _readListViewItems(hwnd, column_index=0):
# Allocate virtual memory inside target process
pid = ctypes.create_string_buffer(4)
p_pid = ctypes.addressof(pid)
GetWindowThreadProcessId(hwnd, p_pid) # process owning the given hwnd
hProcHnd = OpenProcess(win32con.PROCESS_ALL_ACCESS, False, struct.unpack("i", pid)[0])
pLVI = VirtualAllocEx(hProcHnd, 0, 4096, win32con.MEM_RESERVE | win32con.MEM_COMMIT, win32con.PAGE_READWRITE)
pBuffer = VirtualAllocEx(hProcHnd, 0, 4096, win32con.MEM_RESERVE | win32con.MEM_COMMIT, win32con.PAGE_READWRITE)
# Prepare an LVITEM record and write it to target process memory
lvitem_str = struct.pack('iiiiiiiii', *[0, 0, column_index, 0, 0, pBuffer, 4096, 0, 0])
lvitem_buffer = ctypes.create_string_buffer(lvitem_str)
copied = ctypes.create_string_buffer(4)
p_copied = ctypes.addressof(copied)
WriteProcessMemory(hProcHnd, pLVI, ctypes.addressof(lvitem_buffer), ctypes.sizeof(lvitem_buffer), p_copied)
# iterate items in the SysListView32 control
num_items = win32gui.SendMessage(hwnd, commctrl.LVM_GETITEMCOUNT)
item_texts = []
for item_index in range(num_items):
win32gui.SendMessage(hwnd, commctrl.LVM_GETITEMTEXT, item_index, pLVI)
target_buff = ctypes.create_string_buffer(4096)
ReadProcessMemory(hProcHnd, pBuffer, ctypes.addressof(target_buff), 4096, p_copied)
item_texts.append(target_buff.value)
VirtualFreeEx(hProcHnd, pBuffer, 0, win32con.MEM_RELEASE)
VirtualFreeEx(hProcHnd, pLVI, 0, win32con.MEM_RELEASE)
win32api.CloseHandle(hProcHnd)
return item_texts
def close(self):
try:
win32api.CloseHandle(self.pipe)
except pywintypes.error:
# You can't close std handles...?
pass
def writeConnectionLost(self):
self.deactivate()
try:
win32api.CloseHandle(self.writePipe)
except pywintypes.error:
# OMG what
pass
self.lostCallback()
def killPID(pid, sig=None):
"""Kill the process with the given pid."""
try:
if sig is None:
from signal import SIGTERM
sig = SIGTERM
os.kill(pid, sig)
except (AttributeError, ImportError):
if win32api:
handle = win32api.OpenProcess(1, False, pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
def clear_instance_check_event(self):
'''Close handle created by CreateEvent'''
if self.SICHECK_EVENT is not None:
win32api.CloseHandle(self.SICHECK_EVENT)
def check_pids(curmir_incs):
"""Check PIDs in curmir markers to make sure rdiff-backup not running"""
pid_re = re.compile("^PID\s*([0-9]+)", re.I | re.M)
def extract_pid(curmir_rp):
"""Return process ID from a current mirror marker, if any"""
match = pid_re.search(curmir_rp.get_data())
if not match: return None
else: return int(match.group(1))
def pid_running(pid):
"""True if we know if process with pid is currently running"""
try: os.kill(pid, 0)
except OSError, exc:
if exc[0] == errno.ESRCH: return 0
else: log.Log("Warning: unable to check if PID %d still running" % (pid,), 2)
except AttributeError:
assert os.name == 'nt'
import win32api, win32con, pywintypes
process = None
try:
process = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS,
0, pid)
except pywintypes.error, error:
if error[0] == 87: return 0
else:
msg = "Warning: unable to check if PID %d still running"
log.Log(msg % pid, 2)
if process:
win32api.CloseHandle(process)
return 1
return 0
return 1
for curmir_rp in curmir_incs:
assert Globals.local_connection is curmir_rp.conn
pid = extract_pid(curmir_rp)
if pid is not None and pid_running(pid):
log.Log.FatalError(
"""It appears that a previous rdiff-backup session with process
id %d is still running. If two different rdiff-backup processes write
the same repository simultaneously, data corruption will probably
result. To proceed with regress anyway, rerun rdiff-backup with the
--force option.""" % (pid,))