我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.read()。
def detect_chip(port=DEFAULT_PORT, baud=ESP_ROM_BAUD, connect_mode='default_reset'): """ Use serial access to detect the chip type. We use the UART's datecode register for this, it's mapped at the same address on ESP8266 & ESP32 so we can use one memory read and compare to the datecode register for each chip type. This routine automatically performs ESPLoader.connect() (passing connect_mode parameter) as part of querying the chip. """ detect_port = ESPLoader(port, baud) detect_port.connect(connect_mode) print('Detecting chip type...', end='') sys.stdout.flush() date_reg = detect_port.read_reg(ESPLoader.UART_DATA_REG_ADDR) for cls in [ESP8266ROM, ESP32ROM]: if date_reg == cls.DATE_REG_VALUE: # don't connect a second time inst = cls(detect_port._port, baud) print(' %s' % inst.CHIP_NAME) return inst print('') raise FatalError("Unexpected UART datecode value 0x%08x. Failed to autodetect chip type." % date_reg)
def command(self, op=None, data=b"", chk=0, wait_response=True): if op is not None: pkt = struct.pack(b'<BBHI', 0x00, op, len(data), chk) + data self.write(pkt) if not wait_response: return # tries to get a response until that response has the # same operation as the request or a retries limit has # exceeded. This is needed for some esp8266s that # reply with more sync responses than expected. for retry in range(100): p = self.read() if len(p) < 8: continue (resp, op_ret, len_ret, val) = struct.unpack('<BBHI', p[:8]) if resp != 1: continue data = p[8:] if op is None or op_ret == op: return val, data raise FatalError("Response doesn't match request")
def LoadFirmwareImage(chip, filename): """ Load a firmware image. Can be for ESP8266 or ESP32. ESP8266 images will be examined to determine if they are original ROM firmware images (ESPFirmwareImage) or "v2" OTA bootloader images. Returns a BaseFirmwareImage subclass, either ESPFirmwareImage (v1) or OTAFirmwareImage (v2). """ with open(filename, 'rb') as f: if chip == 'esp32': return ESP32FirmwareImage(f) else: # Otherwise, ESP8266 so look at magic to determine the image type magic = ord(f.read(1)) f.seek(0) if magic == ESPLoader.ESP_IMAGE_MAGIC: return ESPFirmwareImage(f) elif magic == ESPBOOTLOADER.IMAGE_V2_MAGIC: return OTAFirmwareImage(f) else: raise FatalError("Invalid image magic number: %d" % magic)
def __init__(self, load_file=None): super(ESP32FirmwareImage, self).__init__() self.flash_mode = 0 self.flash_size_freq = 0 self.version = 1 if load_file is not None: segments = self.load_common_header(load_file, ESPLoader.ESP_IMAGE_MAGIC) additional_header = list(struct.unpack("B" * 16, load_file.read(16))) # check these bytes are unused if additional_header != [0] * 16: print("WARNING: ESP32 image header contains unknown flags. Possibly this image is from a newer version of esptool.py") for _ in range(segments): self.load_segment(load_file) self.checksum = self.read_checksum(load_file)
def _read_elf_file(self, f): # read the ELF file header LEN_FILE_HEADER = 0x34 try: (ident,_type,machine,_version, self.entrypoint,_phoff,shoff,_flags, _ehsize, _phentsize,_phnum,_shentsize, _shnum,shstrndx) = struct.unpack("<16sHHLLLLLHHHHHH", f.read(LEN_FILE_HEADER)) except struct.error as e: raise FatalError("Failed to read a valid ELF header from %s: %s" % (self.name, e)) if byte(ident, 0) != 0x7f or ident[1:4] != b'ELF': raise FatalError("%s has invalid ELF magic header" % self.name) if machine != 0x5e: raise FatalError("%s does not appear to be an Xtensa ELF file. e_machine=%04x" % (self.name, machine)) self._read_sections(f, shoff, shstrndx)
def expand_file_arguments(): """ Any argument starting with "@" gets replaced with all values read from a text file. Text file arguments can be split by newline or by space. Values are added "as-is", as if they were specified in this order on the command line. """ new_args = [] expanded = False for arg in sys.argv: if arg.startswith("@"): expanded = True with open(arg[1:],"r") as f: for line in f.readlines(): new_args += shlex.split(line) else: new_args.append(arg) if expanded: print("esptool.py %s" % (" ".join(new_args[1:]))) sys.argv = new_args
def print_incoming_text(self): """ When starting the connection, print all the debug data until we get to a line with the end sequence '$$$'. """ line = '' #Wait for device to send data time.sleep(1) if self.ser.inWaiting(): line = '' c = '' #Look for end sequence $$$ while '$$$' not in line: c = self.ser.read().decode('utf-8') line += c print(line); else: self.warn("No Message")
def openbci_id(self, serial): """ When automatically detecting port, parse the serial return for the "OpenBCI" ID. """ line = '' #Wait for device to send data time.sleep(2) if serial.inWaiting(): line = '' c = '' #Look for end sequence $$$ while '$$$' not in line: c = serial.read().decode('utf-8') line += c if "OpenBCI" in line: return True return False
def command(self, op=None, data=None, chk=0): if op is not None: pkt = struct.pack(b'<BBHI', 0x00, op, len(data), chk) + data self.write(pkt) # tries to get a response until that response has the # same operation as the request or a retries limit has # exceeded. This is needed for some esp8266s that # reply with more sync responses than expected. for retry in range(100): p = self.read() if len(p) < 8: continue (resp, op_ret, len_ret, val) = struct.unpack('<BBHI', p[:8]) if resp != 1: continue body = p[8:] if op is None or op_ret == op: return val, body # valid response received raise FatalError("Response doesn't match request")
def __init__(self, load_file=None): super(ESPFirmwareImage, self).__init__() self.flash_mode = 0 self.flash_size_freq = 0 self.version = 1 if load_file is not None: (magic, segments, self.flash_mode, self.flash_size_freq, self.entrypoint) = struct.unpack('<BBBBI', load_file.read(8)) # some sanity check if magic != ESPROM.ESP_IMAGE_MAGIC or segments > 16: raise FatalError('Invalid firmware image magic=%d segments=%d' % (magic, segments)) for i in range(segments): self.load_segment(load_file) self.checksum = self.read_checksum(load_file)
def flash_digest(self, addr, length, digest_block_size=0): self._esp.write(struct.pack(b'<B', self.CMD_FLASH_DIGEST)) self._esp.write(struct.pack(b'<III', addr, length, digest_block_size)) digests = [] while True: p = self._esp.read() if len(p) == 16: digests.append(p) elif len(p) == 1: status_code = struct.unpack('<B', p)[0] if status_code != 0: raise FatalError('Write failure, status: %x' % status_code) break else: raise FatalError('Unexpected packet: %s' % hexify(p)) return digests[-1], digests[:-1]
def write_flash(esp, args): flash_params = _get_flash_params(esp, args) flasher = CesantaFlasher(esp, args.baud) for address, argfile in args.addr_filename: image = argfile.read() argfile.seek(0) # rewind in case we need it again if address + len(image) > int(args.flash_size.split('m')[0]) * (1 << 17): print('WARNING: Unlikely to work as data goes beyond end of flash. Hint: Use --flash_size') image = _update_image_flash_params(address, flash_params, image) # Pad to sector size, which is the minimum unit of writing (erasing really). if len(image) % esp.ESP_FLASH_SECTOR != 0: image += b'\xff' * (esp.ESP_FLASH_SECTOR - (len(image) % esp.ESP_FLASH_SECTOR)) t = time.time() flasher.flash_write(address, image, not args.no_progress) t = time.time() - t print('\rWrote %d bytes at 0x%x in %.1f seconds (%.1f kbit/s)...' % (len(image), address, t, len(image) / t * 8 / 1000)) print('Leaving...') if args.verify: print('Verifying just-written flash...') _verify_flash(esp, args, flasher) flasher.boot_fw()
def detect_chip(port=DEFAULT_PORT, baud=ESP_ROM_BAUD, connect_mode='default_reset', trace_enabled=False): """ Use serial access to detect the chip type. We use the UART's datecode register for this, it's mapped at the same address on ESP8266 & ESP32 so we can use one memory read and compare to the datecode register for each chip type. This routine automatically performs ESPLoader.connect() (passing connect_mode parameter) as part of querying the chip. """ detect_port = ESPLoader(port, baud, trace_enabled=trace_enabled) detect_port.connect(connect_mode) print('Detecting chip type...', end='') sys.stdout.flush() date_reg = detect_port.read_reg(ESPLoader.UART_DATA_REG_ADDR) for cls in [ESP8266ROM, ESP32ROM]: if date_reg == cls.DATE_REG_VALUE: # don't connect a second time inst = cls(detect_port._port, baud, trace_enabled=trace_enabled) print(' %s' % inst.CHIP_NAME) return inst print('') raise FatalError("Unexpected UART datecode value 0x%08x. Failed to autodetect chip type." % date_reg)
def _read_elf_file(self, f): # read the ELF file header LEN_FILE_HEADER = 0x34 try: (ident,_type,machine,_version, self.entrypoint,_phoff,shoff,_flags, _ehsize, _phentsize,_phnum, shentsize, shnum,shstrndx) = struct.unpack("<16sHHLLLLLHHHHHH", f.read(LEN_FILE_HEADER)) except struct.error as e: raise FatalError("Failed to read a valid ELF header from %s: %s" % (self.name, e)) if byte(ident, 0) != 0x7f or ident[1:4] != b'ELF': raise FatalError("%s has invalid ELF magic header" % self.name) if machine != 0x5e: raise FatalError("%s does not appear to be an Xtensa ELF file. e_machine=%04x" % (self.name, machine)) if shentsize != self.LEN_SEC_HEADER: raise FatalError("%s has unexpected section header entry size 0x%x (not 0x28)" % (self.name, shentsize, self.LEN_SEC_HEADER)) if shnum == 0: raise FatalError("%s has 0 section headers" % (self.name)) self._read_sections(f, shoff, shnum, shstrndx)
def command(self, op=None, data=None, chk=0): if op is not None: pkt = struct.pack('<BBHI', 0x00, op, len(data), chk) + data self.write(pkt) # tries to get a response until that response has the # same operation as the request or a retries limit has # exceeded. This is needed for some esp8266s that # reply with more sync responses than expected. for retry in xrange(100): p = self.read() if len(p) < 8: continue (resp, op_ret, len_ret, val) = struct.unpack('<BBHI', p[:8]) if resp != 1: continue body = p[8:] if op is None or op_ret == op: return val, body # valid response received raise FatalError("Response doesn't match request")
def __init__(self, load_file=None): super(ESPFirmwareImage, self).__init__() self.flash_mode = 0 self.flash_size_freq = 0 self.version = 1 if load_file is not None: (magic, segments, self.flash_mode, self.flash_size_freq, self.entrypoint) = struct.unpack('<BBBBI', load_file.read(8)) # some sanity check if magic != ESPROM.ESP_IMAGE_MAGIC or segments > 16: raise FatalError('Invalid firmware image magic=%d segments=%d' % (magic, segments)) for i in xrange(segments): self.load_segment(load_file) self.checksum = self.read_checksum(load_file)
def flash_digest(self, addr, length, digest_block_size=0): self._esp.write(struct.pack('<B', self.CMD_FLASH_DIGEST)) self._esp.write(struct.pack('<III', addr, length, digest_block_size)) digests = [] while True: p = self._esp.read() if len(p) == 16: digests.append(p) elif len(p) == 1: status_code = struct.unpack('<B', p)[0] if status_code != 0: raise FatalError('Write failure, status: %x' % status_code) break else: raise FatalError('Unexpected packet: %s' % hexify(p)) return digests[-1], digests[:-1]
def print_incoming_text(self): """ When starting the connection, print all the debug data until we get to a line with the end sequence '$$$'. """ line = '' #Wait for device to send data time.sleep(1) if self.ser.inWaiting(): line = '' c = '' #Look for end sequence $$$ while '$$$' not in line: c = self.ser.read().decode('utf-8') line += c if "On Daisy" in line: self.daisy = True print(line); else: self.warn("No Message")
def openbci_id(self, serial): """ When automatically detecting port, parse the serial return for the "OpenBCI" ID. Also auto-detects the daisy. """ board = False line = '' #Wait for device to send data time.sleep(2) if serial.inWaiting(): line = '' c = '' #Look for end sequence $$$ while '$$$' not in line: c = serial.read().decode('utf-8') line += c if "OpenBCI" in line: return True # if "On Daisy" in line: # self.daisy = True return False
def daisy_id(self): """ When automatically detecting port, parse the serial return for the "OpenBCI" ID. """ line = '' #Wait for device to send data # time.sleep(2) # if self.ser.inWaiting(): # line = '' # c = '' # #Look for end sequence $$$ # while '$$$' not in line: # c = self.ser.read().decode('utf-8') # line += c # if "On Daisy" in line: # return True return False
def print_incoming_text(self,print_enable): """ When starting the connection, print all the debug data until we get to a line with the end sequence '$$$'. """ line = '' #Wait for device to send data time.sleep(1) if self.ser.inWaiting(): line = '' c = '' #Look for end sequence $$$ while '$$$' not in line: c = self.ser.read().decode('utf-8', errors='replace') line += c if "On Daisy" in line: self.daisy = True if print_enable: print(line); # else: # self.warn("No Message")
def openbci_id(self, serial): """ When automatically detecting port, parse the serial return for the "OpenBCI" ID. Also auto-detects the daisy. """ board = False line = '' #Wait for device to send data time.sleep(2) if serial.inWaiting(): line = '' c = '' #Look for end sequence $$$ while '$$$' not in line: c = serial.read().decode('utf-8', errors='replace') line += c if "OpenBCI" in line: return True return False
def __checkType__(self, serial): byte = serial.read().decode("utf-8") print(byte) if byte == "1": print("??? ???? ?? : " + serial.port) serial.write(b'0') self.threads["Sensor"] = SensorThread(serial) return self.threads["Sensor"] elif byte == "2" : print("??? ???? ?? : " + serial.port) serial.write(b'0') self.threads["Control"] = ControlThread(serial) return self.threads["Control"] else : print("? ? ?? ???? : " + serial.port) serial.write(b'1') return None #?? ???
def __init__(self, load_file=None): super(ESP32FirmwareImage, self).__init__() self.flash_mode = 0 self.flash_size_freq = 0 self.version = 1 if load_file is not None: segments = self.load_common_header(load_file, ESPLoader.ESP_IMAGE_MAGIC) additional_header = list(struct.unpack(self.EXTENDED_HEADER_STRUCT_FMT, load_file.read(16))) # check these bytes are unused if additional_header != self.EXTENDED_HEADER: print("WARNING: ESP32 image header contains unknown flags. Possibly this image is from a different version of esptool.py") for _ in range(segments): self.load_segment(load_file) self.checksum = self.read_checksum(load_file)
def read(self): return next(self._slip_reader)
def read_reg(self, addr): # we don't call check_command here because read_reg() function is called # when detecting chip type, and the way we check for success (STATUS_BYTES_LENGTH) is different # for different chip types (!) val, data = self.command(self.ESP_READ_REG, struct.pack('<I', addr)) if byte(data, 0) != 0: raise FatalError.WithResult("Failed to read register address %08x" % addr, data) return val
def run_stub(self, stub=None): if stub is None: if self.IS_STUB: raise FatalError("Not possible for a stub to load another stub (memory likely to overlap.)") stub = self.STUB_CODE # Upload print("Uploading stub...") for field in ['text', 'data']: if field in stub: offs = stub[field + "_start"] length = len(stub[field]) blocks = (length + self.ESP_RAM_BLOCK - 1) // self.ESP_RAM_BLOCK self.mem_begin(length, blocks, self.ESP_RAM_BLOCK, offs) for seq in range(blocks): from_offs = seq * self.ESP_RAM_BLOCK to_offs = from_offs + self.ESP_RAM_BLOCK self.mem_block(stub[field][from_offs:to_offs], seq) print("Running stub...") self.mem_finish(stub['entry']) p = self.read() if p != b'OHAI': raise FatalError("Failed to start stub. Unexpected response: %s" % p) print("Stub running...") return self.STUB_CLASS(self)
def load_common_header(self, load_file, expected_magic): (magic, segments, self.flash_mode, self.flash_size_freq, self.entrypoint) = struct.unpack('<BBBBI', load_file.read(8)) if magic != expected_magic or segments > 16: raise FatalError('Invalid firmware image magic=%d segments=%d' % (magic, segments)) return segments
def load_segment(self, f, is_irom_segment=False): """ Load the next segment from the image file """ file_offs = f.tell() (offset, size) = struct.unpack('<II', f.read(8)) self.warn_if_unusual_segment(offset, size, is_irom_segment) segment_data = f.read(size) if len(segment_data) < size: raise FatalError('End of file reading segment 0x%x, length %d (actual length %d)' % (offset, size, len(segment_data))) segment = ImageSegment(offset, segment_data, file_offs) self.segments.append(segment) return segment
def read_checksum(self, f): """ Return ESPLoader checksum from end of just-read image """ # Skip the padding. The checksum is stored in the last byte so that the # file is a multiple of 16 bytes. align_file_position(f, 16) return ord(f.read(1))
def _read_sections(self, f, section_header_offs, shstrndx): f.seek(section_header_offs) section_header = f.read() LEN_SEC_HEADER = 0x28 if len(section_header) == 0: raise FatalError("No section header found at offset %04x in ELF file." % section_header_offs) if len(section_header) % LEN_SEC_HEADER != 0: print('WARNING: Unexpected ELF section header length %04x is not mod-%02x' % (len(section_header),LEN_SEC_HEADER)) # walk through the section header and extract all sections section_header_offsets = range(0, len(section_header), LEN_SEC_HEADER) def read_section_header(offs): name_offs,sec_type,_flags,lma,sec_offs,size = struct.unpack_from("<LLLLLL", section_header[offs:]) return (name_offs, sec_type, lma, size, sec_offs) all_sections = [read_section_header(offs) for offs in section_header_offsets] prog_sections = [s for s in all_sections if s[1] == ELFFile.SEC_TYPE_PROGBITS] # search for the string table section if not shstrndx * LEN_SEC_HEADER in section_header_offsets: raise FatalError("ELF file has no STRTAB section at shstrndx %d" % shstrndx) _,sec_type,_,sec_size,sec_offs = read_section_header(shstrndx * LEN_SEC_HEADER) if sec_type != ELFFile.SEC_TYPE_STRTAB: print('WARNING: ELF file has incorrect STRTAB section type 0x%02x' % sec_type) f.seek(sec_offs) string_table = f.read(sec_size) # build the real list of ELFSections by reading the actual section names from the # string table section, and actual data for each section from the ELF file itself def lookup_string(offs): raw = string_table[offs:] return raw[:raw.index(b'\x00')] def read_data(offs,size): f.seek(offs) return f.read(size) prog_sections = [ELFSection(lookup_string(n_offs), lma, read_data(offs, size)) for (n_offs, _type, lma, size, offs) in prog_sections if lma != 0] self.sections = prog_sections
def slip_reader(port): """Generator to read SLIP packets from a serial port. Yields one full SLIP packet at a time, raises exception on timeout or invalid data. Designed to avoid too many calls to serial.read(1), which can bog down on slow systems. """ partial_packet = None in_escape = False while True: waiting = port.inWaiting() read_bytes = port.read(1 if waiting == 0 else waiting) if read_bytes == b'': raise FatalError("Timed out waiting for packet %s" % ("header" if partial_packet is None else "content")) for b in read_bytes: if type(b) is int: b = bytes([b]) # python 2/3 compat if partial_packet is None: # waiting for packet header if b == b'\xc0': partial_packet = b"" else: raise FatalError('Invalid head of packet (%r)' % b) elif in_escape: # part-way through escape sequence in_escape = False if b == b'\xdc': partial_packet += b'\xc0' elif b == b'\xdd': partial_packet += b'\xdb' else: raise FatalError('Invalid SLIP escape (%r%r)' % (b'\xdb', b)) elif b == b'\xdb': # start of escape sequence in_escape = True elif b == b'\xc0': # end of packet yield partial_packet partial_packet = None else: # normal byte in packet partial_packet += b
def dump_mem(esp, args): f = open(args.filename, 'wb') for i in range(args.size // 4): d = esp.read_reg(args.address + (i * 4)) f.write(struct.pack(b'<I', d)) if f.tell() % 1024 == 0: print('\r%d bytes read... (%d %%)' % (f.tell(), f.tell() * 100 // args.size), end=' ') sys.stdout.flush() print('Done!')
def make_image(args): image = ESPFirmwareImage() if len(args.segfile) == 0: raise FatalError('No segments specified') if len(args.segfile) != len(args.segaddr): raise FatalError('Number of specified files does not match number of specified addresses') for (seg, addr) in zip(args.segfile, args.segaddr): data = open(seg, 'rb').read() image.segments.append(ImageSegment(addr, data)) image.entrypoint = args.entrypoint image.save(args.output)