我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用win32file.GENERIC_READ。
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print "Successfully wrote and read a file" else: raise Exception("Got different data back???") os.unlink(testName)
def testSimpleFiles(self): fd, filename = tempfile.mkstemp() os.close(fd) os.unlink(filename) handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = str2bytes("Hello\0there") try: win32file.WriteFile(handle, test_data) handle.Close() # Try and open for read handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) self.assertEquals(data, test_data) finally: handle.Close() try: os.unlink(filename) except os.error: pass # A simple test using normal read/write operations.
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print("Successfully wrote and read a file") else: raise Exception("Got different data back???") os.unlink(testName)
def _OpenFileForRead(self, path): try: fhandle = self.fhandle = win32file.CreateFile( path, win32file.GENERIC_READ, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) self._closer = weakref.ref( self, lambda x: win32file.CloseHandle(fhandle)) self.write_enabled = False return fhandle except pywintypes.error as e: raise IOError("Unable to open %s: %s" % (path, e))
def _OpenFileForWrite(self, path): try: fhandle = self.fhandle = win32file.CreateFile( path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) self.write_enabled = True self._closer = weakref.ref( self, lambda x: win32file.CloseHandle(fhandle)) return fhandle except pywintypes.error as e: raise IOError("Unable to open %s: %s" % (path, e))
def opencom1(): # returns a handle to the Windows file return win32file.CreateFile(u'COM1:', win32file.GENERIC_READ, \ 0, None, win32file.OPEN_EXISTING, 0, None)
def connect(self, address): win32pipe.WaitNamedPipe(address, self._timeout) try: handle = win32file.CreateFile( address, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT, 0 ) except win32pipe.error as e: # See Remarks: # https://msdn.microsoft.com/en-us/library/aa365800.aspx if e.winerror == cERROR_PIPE_BUSY: # Another program or thread has grabbed our pipe instance # before we got to it. Wait for availability and attempt to # connect again. win32pipe.WaitNamedPipe(address, RETRY_WAIT_TIMEOUT) return self.connect(address) raise e self.flags = win32pipe.GetNamedPipeInfo(handle)[0] self._handle = handle self._address = address
def testMoreFiles(self): # Create a file in the %TEMP% directory. testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE # Set a flag to delete the file automatically when it is closed. fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0) # Write a known number of bytes to the file. data = str2bytes("z") * 1025 win32file.WriteFile(h, data) self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!") # Ensure we can read the data back. win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN) hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra self.failUnless(hr==0, "Readfile returned %d" % hr) self.failUnless(read_data == data, "Read data is not what we wrote!") # Now truncate the file at 1/2 its existing size. newSize = len(data)//2 win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN) win32file.SetEndOfFile(h) self.failUnlessEqual(win32file.GetFileSize(h), newSize) # GetFileAttributesEx/GetFileAttributesExW tests. self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName)) attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName) self.failUnless(size==newSize, "Expected GetFileAttributesEx to return the same size as GetFileSize()") self.failUnless(attr==win32file.GetFileAttributes(testName), "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes") h = None # Close the file by removing the last reference to the handle! self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
def testFilePointer(self): # via [ 979270 ] SetFilePointer fails with negative offset # Create a file in the %TEMP% directory. filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE, 0, None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, 0) try: #Write some data data = str2bytes('Some data') (res, written) = win32file.WriteFile(f, data) self.failIf(res) self.assertEqual(written, len(data)) #Move at the beginning and read the data win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN) (res, s) = win32file.ReadFile(f, len(data)) self.failIf(res) self.assertEqual(s, data) #Move at the end and read the data win32file.SetFilePointer(f, -len(data), win32file.FILE_END) (res, s) = win32file.ReadFile(f, len(data)) self.failIf(res) self.failUnlessEqual(s, data) finally: f.Close() os.unlink(filename)
def testFileTimesTimezones(self): if not issubclass(pywintypes.TimeType, datetime.datetime): # maybe should report 'skipped', but that's not quite right as # there is nothing you can do to avoid it being skipped! return filename = tempfile.mktemp("-testFileTimes") now_utc = win32timezone.utcnow() now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local()) h = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE, 0, None, win32file.CREATE_ALWAYS, 0, 0) try: win32file.SetFileTime(h, now_utc, now_utc, now_utc) ct, at, wt = win32file.GetFileTime(h) self.failUnlessEqual(now_local, ct) self.failUnlessEqual(now_local, at) self.failUnlessEqual(now_local, wt) # and the reverse - set local, check against utc win32file.SetFileTime(h, now_local, now_local, now_local) ct, at, wt = win32file.GetFileTime(h) self.failUnlessEqual(now_utc, ct) self.failUnlessEqual(now_utc, at) self.failUnlessEqual(now_utc, wt) finally: h.close() os.unlink(filename)
def testSimpleOverlapped(self): # Create a file in the %TEMP% directory. import win32event testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_WRITE overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt # Create the file and write shit-loads of data to it. h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0) chunk_data = str2bytes("z") * 0x8000 num_loops = 512 expected_size = num_loops * len(chunk_data) for i in range(num_loops): win32file.WriteFile(h, chunk_data, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(chunk_data) h.Close() # Now read the data back overlapped overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt desiredAccess = win32file.GENERIC_READ h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0) buffer = win32file.AllocateReadBuffer(0xFFFF) while 1: try: hr, data = win32file.ReadFile(h, buffer, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(data) if not data is buffer: self.fail("Unexpected result from ReadFile - should be the same buffer we passed it") except win32api.error: break h.Close()
def NullFile(inheritable): """Create a null file handle.""" if inheritable: sa = pywintypes.SECURITY_ATTRIBUTES() sa.bInheritHandle = 1 else: sa = None return win32file.CreateFile("nul", win32file.GENERIC_READ | win32file.GENERIC_WRITE, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, sa, win32file.OPEN_EXISTING, 0, None)
def get_read_handle(filename): if os.path.isdir(filename): dwFlagsAndAttributes = win32file.FILE_FLAG_BACKUP_SEMANTICS else: dwFlagsAndAttributes = 0 # CreateFile(fileName, desiredAccess, shareMode, attributes, # CreationDisposition, flagsAndAttributes, hTemplateFile) try: handle = win32file.CreateFileW(filename, # with 'W' accepts unicode win32file.GENERIC_READ, win32file.FILE_SHARE_READ, None, win32file.OPEN_EXISTING, dwFlagsAndAttributes, None) return handle except Exception as e: raise PygcamException("get_read_handle(%s) failed: %s" % (filename, e))
def run(self): import win32file self.file_handle = win32file.CreateFile(self.named_pipe_path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None) log.info('Windows named pipe connected') self.fire_connected() while True: # The following code is cleaner, than waiting for an exception while writing to detect pipe closing, # but causes mpv to hang and crash while closing when closed at the wrong time. # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL: # # pipe was closed # break try: while not self.write_queue.empty(): win32file.WriteFile(self.file_handle, self.write_queue.get_nowait()) except win32file.error: log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.') break size = win32file.GetFileSize(self.file_handle) if size > 0: while size > 0: # pipe has data to read data = win32file.ReadFile(self.file_handle, 512) self.on_data(data[1]) size = win32file.GetFileSize(self.file_handle) else: time.sleep(1) log.info('Windows named pipe closed') win32file.CloseHandle(self.file_handle) self.file_handle = None self.fire_disconnected()