我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fcntl.ioctl()。
def read_word_data(self, addr, cmd): """Read a word (2 bytes) from the specified cmd register of the device. Note that this will interpret data using the endianness of the processor running Python (typically little endian)! """ assert self._device is not None, 'Bus must be opened before operations are made against it!' # Build ctypes values to marshall between ioctl and Python. reg = c_uint8(cmd) result = c_uint16() # Build ioctl request. request = make_i2c_rdwr_data([ (addr, 0, 1, pointer(reg)), # Write cmd register. (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes). ]) # Make ioctl call and return result data. ioctl(self._device.fileno(), I2C_RDWR, request) return result.value
def process_call(self, addr, cmd, val): """Perform a smbus process call by writing a word (2 byte) value to the specified register of the device, and then reading a word of response data (which is returned). """ assert self._device is not None, 'Bus must be opened before operations are made against it!' # Build ctypes values to marshall between ioctl and Python. data = create_string_buffer(struct.pack('=BH', cmd, val)) result = c_uint16() # Build ioctl request. request = make_i2c_rdwr_data([ (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))), # Write data. (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes). ]) # Make ioctl call and return result data. ioctl(self._device.fileno(), I2C_RDWR, request) # Note the python-smbus code appears to have a rather serious bug and # does not return the result value! This is fixed below by returning it. return result.value
def get_iphostname(): '''??linux?????????IP??''' def get_ip(ifname): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ipaddr = socket.inet_ntoa(fcntl.ioctl( sock.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24] ) sock.close() return ipaddr try: ip = get_ip('eth0') except IOError: ip = get_ip('eno1') hostname = socket.gethostname() return {'hostname': hostname, 'ip':ip}
def read_i2c_block_data(self, addr, cmd, length=32): """Perform a read from the specified cmd register of device. Length number of bytes (default of 32) will be read and returned as a bytearray. """ assert self._device is not None, 'Bus must be opened before operations are made against it!' # Build ctypes values to marshall between ioctl and Python. reg = c_uint8(cmd) result = create_string_buffer(length) # Build ioctl request. request = make_i2c_rdwr_data([ (addr, 0, 1, pointer(reg)), # Write cmd register. (addr, I2C_M_RD, length, cast(result, POINTER(c_uint8))) # Read data. ]) # Make ioctl call and return result data. ioctl(self._device.fileno(), I2C_RDWR, request) return bytearray(result.raw) # Use .raw instead of .value which will stop at a null byte!
def get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.env['LINES'], os.env['COLUMNS']) except: cr = (25, 80) return int(cr[1]), int(cr[0])
def check_vmx_pt(): from fcntl import ioctl KVMIO = 0xAE KVM_VMX_PT_SUPPORTED = KVMIO << (8) | 0xe4 try: fd = open("/dev/kvm", "wb") except: print('\033[91m' + error_prefix + "KVM is not loaded!" + '\033[0m') return False try: ret = ioctl(fd, KVM_VMX_PT_SUPPORTED, 0) except IOError: print('\033[91m' + error_prefix + "VMX_PT is not loaded!" + '\033[0m') return False fd.close() if ret == 0: print('\033[91m' + error_prefix + "Intel PT is not supported on this CPU!" + '\033[0m') return False return True
def getipaddr(self, ifname='eth0'): import socket import struct ret = '127.0.0.1' try: ret = socket.gethostbyname(socket.getfqdn(socket.gethostname())) except: pass if ret == '127.0.0.1': try: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ret = socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24]) except: pass return ret
def terminal_size(): """Get the width and height of the terminal. http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ http://stackoverflow.com/questions/17993814/why-the-irrelevant-code-made-a-difference :return: Width (number of characters) and height (number of lines) of the terminal. :rtype: tuple """ if hasattr(ctypes, 'windll'): # Only works on Microsoft Windows platforms. string_buffer = ctypes.create_string_buffer(22) # To be written to by GetConsoleScreenBufferInfo. ctypes.windll.kernel32.GetConsoleScreenBufferInfo(ctypes.windll.kernel32.GetStdHandle(-11), string_buffer) left, top, right, bottom = struct.unpack('hhhhHhhhhhh', string_buffer.raw)[5:-2] width, height = right - left, bottom - top if width < 1 or height < 1: return DEFAULT_WIDTH, DEFAULT_HEIGHT return width, height try: device = fcntl.ioctl(0, termios.TIOCGWINSZ, '\0' * 8) except IOError: return DEFAULT_WIDTH, DEFAULT_HEIGHT height, width = struct.unpack('hhhh', device)[:2] return width, height
def _printProgessBar(self, f, startTime): diff = time.time() - startTime total = f.total try: winSize = struct.unpack('4H', fcntl.ioctl(0, tty.TIOCGWINSZ, '12345679')) except IOError: winSize = [None, 80] speed = total/diff if speed: timeLeft = (f.size - total) / speed else: timeLeft = 0 front = f.name back = '%3i%% %s %sps %s ' % ((total/f.size)*100, self._abbrevSize(total), self._abbrevSize(total/diff), self._abbrevTime(timeLeft)) spaces = (winSize[1] - (len(front) + len(back) + 1)) * ' ' self.transport.write('\r%s%s%s' % (front, spaces, back))
def getFPVersion(): ret = None try: if getBrandOEM() == "blackbox": file = open("/proc/stb/info/micomver", "r") ret = file.readline().strip() file.close() elif getBoxType() in ('dm7080','dm820','dm520','dm525','dm900'): ret = open("/proc/stb/fp/version", "r").read() else: ret = long(open("/proc/stb/fp/version", "r").read()) except IOError: try: fp = open("/dev/dbox/fp0") ret = ioctl(fp.fileno(),0) except IOError: try: ret = open("/sys/firmware/devicetree/base/bolt/tag", "r").read().rstrip("\0") except: print "getFPVersion failed!" return ret
def get(self): arg = DrmModeObjGetPropertiesC() arg.obj_id = self.obj_id arg.obj_type = self.obj_type fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg) prop_ids = (ctypes.c_uint32*arg.count_props)() arg.props_ptr = ctypes.cast(ctypes.pointer(prop_ids), ctypes.c_void_p).value prop_values = (ctypes.c_uint64*arg.count_props)() arg.prop_values_ptr = ctypes.cast(ctypes.pointer(prop_values), ctypes.c_void_p).value fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg) self._arg = arg vals = [v for i, v in zip(prop_ids, prop_values) if i == self.id] return vals[0]
def fetch(self): arg = DrmModeCrtcC() arg.crtc_id = self.id fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_GETCRTC, arg) self._arg = arg if arg.fb_id: self.fb = self._drm.get_framebuffer(arg.fb_id) else: self.fb = None self.x = int(arg.x) self.y = int(arg.y) self.gamma_size = int(arg.gamma_size) self.mode_valid = bool(arg.mode_valid) if self.mode_valid: self.mode = DrmMode(arg.mode) self.width = self.mode.hdisplay self.height = self.mode.vdisplay else: self.mode = None self.width = 0 self.height = 0
def set(self, fb, x, y, mode, *conns): arg = DrmModeCrtcC() arg.crtc_id = self.id arg.fb_id = fb.id arg.x = x arg.y = y arg.mode_valid = 1 arg.mode = mode._arg connector_ids = (ctypes.c_uint32 * len(conns))(*[conn.id for conn in conns]) arg.set_connectors_ptr = ctypes.cast(ctypes.pointer(connector_ids), ctypes.c_void_p).value arg.count_connectors = len(conns) fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_SETCRTC, arg) self.fetch()
def __init__(self, drm): self._drm = drm arg = DrmVersionC() arg.name = create_string_buffer(256) arg.name_len = 255 arg.date = create_string_buffer(256) arg.date_len = 255 arg.desc = create_string_buffer(256) arg.desc_len = 255 fcntl.ioctl(self._drm.fd, DRM_IOCTL_VERSION, arg) self._arg = arg self.major = int(arg.major) self.minor = int(arg.minor) self.patchlevel = int(arg.patchlevel) self.name = str(arg.name[:arg.name_len]) self.date = str(arg.date[:arg.date_len]) self.desc = str(arg.desc[:arg.desc_len])
def prime_import(cls, drm, fd, format_, width, height): arg = DrmPrimeHandleC() arg.fd = fd fcntl.ioctl(drm.fd, DRM_IOCTL_PRIME_FD_TO_HANDLE, arg) self = cls(drm, format_, width, height) self._arg = arg self.fd = fd self.id = int(arg.handle) self.pitch = self.format.cpp[0] * width self.len = height * self.pitch self.handles[0] = self.id self.pitches[0] = self.pitch self.offsets[0] = 0 if self.format.planes > 1: raise NotImplementedError("support for format %s is not implemented yet\n" % self.format.name) return self
def __init__(self, drm, format_, width, height): super(DrmDumbBuffer, self).__init__(drm, format_, width, height) arg = DrmModeCreateDumbC() arg.bpp = self.format.cpp[0] * 8; arg.width = self.width; arg.height = self.height; fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_CREATE_DUMB, arg) #self.handle = arg.handle self.id = int(arg.handle) self.len = int(arg.size) self.pitch = int(arg.pitch) self._arg = arg self.handles[0] = self.id self.pitches[0] = self.pitch self.offsets[0] = 0 if self.format.planes > 1: raise NotImplementedError("support for format %s is not implemented yet\n" % self.format.name)
def read_i2c_block_data(self, i2c_addr, register, length): # type: (int, int, int) -> list """ Read a block of byte data from a given register :rtype: list :param i2c_addr: i2c address :param register: Start register :param length: Desired block length :return: List of bytes """ if length > I2C_SMBUS_BLOCK_MAX: raise ValueError("Desired block length over %d bytes" % I2C_SMBUS_BLOCK_MAX) self._set_address(i2c_addr) msg = i2c_smbus_ioctl_data.create( read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA ) msg.data.contents.byte = length ioctl(self.fd, I2C_SMBUS, msg) return msg.data.contents.block[1:length+1]
def write_i2c_block_data(self, i2c_addr, register, data): # type: (int, int, list) -> None """ Write a block of byte data to a given register :param i2c_addr: i2c address :param register: Start register :param data: List of bytes """ length = len(data) if length > I2C_SMBUS_BLOCK_MAX: raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX) self._set_address(i2c_addr) msg = i2c_smbus_ioctl_data.create( read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA ) msg.data.contents.block[0] = length msg.data.contents.block[1:length + 1] = data ioctl(self.fd, I2C_SMBUS, msg)
def _set_rs485_mode(self, rs485_settings): buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding try: fcntl.ioctl(self.fd, TIOCGRS485, buf) if rs485_settings is not None: if rs485_settings.loopback: buf[0] |= SER_RS485_RX_DURING_TX else: buf[0] &= ~SER_RS485_RX_DURING_TX if rs485_settings.rts_level_for_tx: buf[0] |= SER_RS485_RTS_ON_SEND else: buf[0] &= ~SER_RS485_RTS_ON_SEND if rs485_settings.rts_level_for_rx: buf[0] |= SER_RS485_RTS_AFTER_SEND else: buf[0] &= ~SER_RS485_RTS_AFTER_SEND buf[1] = int(rs485_settings.delay_rts_before_send * 1000) buf[2] = int(rs485_settings.delay_rts_after_send * 1000) else: buf[0] = 0 # clear SER_RS485_ENABLED fcntl.ioctl(self.fd, TIOCSRS485, buf) except IOError as e: raise ValueError('Failed to set RS485 mode: %s' % (e,))
def _get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl, termi_os, struct cr = struct.unpack('hh', fcntl.ioctl(fd, termi_os.TIOCGWINSZ,'1234')) except: return return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = _os.open(_os.ctermid(), _os.O_RDONLY) cr = ioctl_GWINSZ(fd) _os.cl_ose(fd) except: pass if not cr: cr = (_os.environ.get('LINES', 25), _os.environ.get('COLUMNS', 80)) return list(map(int, cr))
def make_uinput(): import fcntl, struct # Requires uinput driver, but it's usually available. uinput = open("/dev/uinput", 'wb') UI_SET_EVBIT = 0x40045564 fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY) UI_SET_KEYBIT = 0x40045565 for i in range(256): fcntl.ioctl(uinput, UI_SET_KEYBIT, i) BUS_USB = 0x03 uinput_user_dev = "80sHHHHi64i64i64i64i" axis = [0] * 64 * 4 uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis)) uinput.flush() # Without this you may get Errno 22: Invalid argument. UI_DEV_CREATE = 0x5501 fcntl.ioctl(uinput, UI_DEV_CREATE) UI_DEV_DESTROY = 0x5502 #fcntl.ioctl(uinput, UI_DEV_DESTROY) return uinput
def _get_terminal_size_linux(): def ioctl_GWINSZ(fd): try: import fcntl import termios cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) return cr except: pass cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.environ['LINES'], os.environ['COLUMNS']) except: return None return int(cr[1]), int(cr[0])
def getpending(self): e = Event('key', '', '') while not self.event_queue.empty(): e2 = self.event_queue.get() e.data += e2.data e.raw += e.raw amount = struct.unpack( "i", ioctl(self.input_fd, FIONREAD, "\0\0\0\0"))[0] data = os.read(self.input_fd, amount) raw = unicode(data, self.encoding, 'replace') #XXX: something is wrong here e.data += raw e.raw += raw return e
def GetHelpWidth(): """Returns: an integer, the width of help lines that is used in TextWrap.""" if (not sys.stdout.isatty()) or (termios is None) or (fcntl is None): return _help_width try: data = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, '1234') columns = struct.unpack('hh', data)[1] # Emacs mode returns 0. # Here we assume that any value below 40 is unreasonable if columns >= 40: return columns # Returning an int as default is fine, int(int) just return the int. return int(os.getenv('COLUMNS', _help_width)) except (TypeError, IOError, struct.error): return _help_width
def get_terminal_size(fallback=(80, 24)): """ Return tuple containing columns and rows of controlling terminal, trying harder than shutil.get_terminal_size to find a tty before returning fallback. Theoretically, stdout, stderr, and stdin could all be different ttys that could cause us to get the wrong measurements (instead of using the fallback) but the much more common case is that IO is piped. """ for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]: try: # Make WINSIZE call to terminal data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00") except (IOError, OSError): pass else: # Unpack two shorts from ioctl call lines, columns = struct.unpack("hh", data) break else: columns, lines = fallback return columns, lines
def readMetadata(self): """Read device identity and capabilities via ioctl()""" buffer = "\0"*512 # Read the name self.name = ioctl(self.fd, EVIOCGNAME_512, buffer) self.name = self.name[:self.name.find("\0")] # Read info on each absolute axis absmap = Event.codeMaps['EV_ABS'] buffer = "\0" * struct.calcsize("iiiii") self.absAxisInfo = {} for name, number in absmap.nameMap.iteritems(): try: values = struct.unpack("iiiii", ioctl(self.fd, EVIOCGABS_512 + number, buffer)) values = dict(zip(( 'value', 'min', 'max', 'fuzz', 'flat' ),values)) self.absAxisInfo[name] = values except IOError: continue
def set_special_baudrate(port, baudrate): # right size is 44 on x86_64, allow for some growth import array buf = array.array('i', [0] * 64) try: # get serial_struct FCNTL.ioctl(port.fd, TCGETS2, buf) # set custom speed buf[2] &= ~TERMIOS.CBAUD buf[2] |= BOTHER buf[9] = buf[10] = baudrate # set serial_struct res = FCNTL.ioctl(port.fd, TCSETS2, buf) except IOError, e: raise ValueError('Failed to set custom baud rate (%s): %s' % (baudrate, e))
def xfer(self, txbuff=None): length = len(txbuff) if PYTHON_MAJOR >= 3: _txbuff = bytes(txbuff) _txptr = ctypes.create_string_buffer(_txbuff) else: _txbuff = str(bytearray(txbuff)) _txptr = ctypes.create_string_buffer(_txbuff) _rxptr = ctypes.create_string_buffer(length) data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B ctypes.addressof(_txptr), ctypes.addressof(_rxptr), length, self.speed, 0, #delay self.bits, 0, # cs_change, 0 # pad ) fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data) _rxbuff = ctypes.string_at(_rxptr, length) return bytearray(_rxbuff)
def _select_device(self, addr): """Set the address of the device to communicate with on the I2C bus.""" ioctl(self._device.fileno(), I2C_SLAVE, addr & 0x7F)
def read_byte_data(self, addr, cmd): """Read a single byte from the specified cmd register of the device.""" assert self._device is not None, 'Bus must be opened before operations are made against it!' # Build ctypes values to marshall between ioctl and Python. reg = c_uint8(cmd) result = c_uint8() # Build ioctl request. request = make_i2c_rdwr_data([ (addr, 0, 1, pointer(reg)), # Write cmd register. (addr, I2C_M_RD, 1, pointer(result)) # Read 1 byte as result. ]) # Make ioctl call and return result data. ioctl(self._device.fileno(), I2C_RDWR, request) return result.value
def read_block_data(self, addr, cmd): """Perform a block read from the specified cmd register of the device. The amount of data read is determined by the first byte send back by the device. Data is returned as a bytearray. """ # TODO: Unfortunately this will require calling the low level I2C # access ioctl to trigger a proper read_block_data. The amount of data # returned isn't known until the device starts responding so an I2C_RDWR # ioctl won't work. raise NotImplementedError()
def get_terminal_size(): """Returns a tuple (x, y) representing the width(x) and the height(x) in characters of the terminal window.""" def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack( 'hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234') ) except: return None if cr == (0, 0): return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(cr[1]), int(cr[0])
def _get_terminal_size(fd): try: res = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 4) except IOError as e: raise OSError(e) lines, columns = struct.unpack("hh", res) return terminal_size(columns, lines)
def get_terminal_size(): """Returns a tuple (x, y) representing the width(x) and the height(x) in characters of the terminal window.""" def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None if cr == (0, 0): return None if cr == (0, 0): return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(cr[1]), int(cr[0])
def ioctl(self, fd, ioctlno): self.pack() rv = fcntl.ioctl(fd, ioctlno, self._buffer, True) self.unpack() return rv
def get_version(self, fd): return self.ioctl(fd, HIDIOCGVERSION)
def get_flags(self, fd): return self.ioctl(fd, HIDIOCGFLAG)
def set_flags(self, fd): return self.ioctl(fd, HIDIOCSFLAG)
def get(self, fd): return self.ioctl(fd, HIDIOCGDEVINFO)
def get_info(self, fd): return self.ioctl(fd, HIDIOCGREPORTINFO)
def get_info(self, fd): return self.ioctl(fd, HIDIOCGFIELDINFO)
def get_info(self, fd, index): self.index = index return self.ioctl(fd, HIDIOCGCOLLECTIONINFO)
def get_device_name(f): a = array.array('B', [0]*1024) fcntl.ioctl(f, HIDIOCGNAME(1024), a, True) print a