我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用binascii.crc32()。
def _GenerateCRCTable(): """Generate a CRC-32 table. ZIP encryption uses the CRC32 one-byte primitive for scrambling some internal keys. We noticed that a direct implementation is faster than relying on binascii.crc32(). """ poly = 0xedb88320 table = [0] * 256 for i in range(256): crc = i for j in range(8): if crc & 1: crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly else: crc = ((crc >> 1) & 0x7FFFFFFF) table[i] = crc return table
def build_header(self): timestamp = utctimestamp() padding = str.encode('\0\0' * 14) data = pack( '<i2IQH14s', timestamp, self.metadata['incremental'], self.metadata['segment_size'], self.metadata['sectors'], len(self.metadata['bases']), padding ) checksum = crc32(data) for i in self.metadata['bases']: data += i checksum = crc32(i, checksum) return data, checksum
def write_body(self, f): checksum = 0 for segment, meta in self.segments.items(): data = pack( '<2IH2B20s', segment, meta['incremental'], meta['base'], meta['encryption'], meta['compression'], meta['sha1_hash'] ) f.write(data) checksum = crc32(data, checksum) """ Backfill the body_checksum """ f.seek(24, 0) f.write(pack('<I', checksum))
def __init__(self, dump): assert len(dump) == NOR_SIZE (img2_magic, self.block_size, unused, firmware_block, firmware_block_count) = struct.unpack('<4s4I', dump[:20]) (img2_crc,) = struct.unpack('<I', dump[48:52]) assert img2_crc == binascii.crc32(dump[:48]) & 0xffffffff self.firmware_offset = self.block_size * firmware_block self.firmware_length = self.block_size * firmware_block_count self.parts = [ dump[0:52], dump[52:512], dump[512:self.firmware_offset], dump[self.firmware_offset:self.firmware_offset + self.firmware_length], dump[self.firmware_offset + self.firmware_length:] ] self.images = [] offset = 0 while 1: (magic, size) = struct.unpack('<4sI', self.parts[3][offset:offset+8]) if magic != 'Img3'[::-1] or size == 0: break self.images.append(self.parts[3][offset:offset + size]) offset += size
def find_xorpad(titleid, crc32): expectedname = "%s.%08lx.Main.exheader.xorpad" % (titleid, crc32) legacyname = titleid + ".Main.exheader.xorpad" xorpads = glob.glob(os.path.join("xorpads", "*.[xX][oO][rR][pP][aA][dD]")) xorpads += glob.glob(os.path.join("xorpads", "*.[zZ][iI][pP]")) for xorpad in xorpads: if zipfile.is_zipfile(xorpad): with zipfile.ZipFile(xorpad, "r") as e: for entry in e.infolist(): filename = os.path.join(tmpdir, expectedname) basename = os.path.basename(entry.filename) if basename.lower() == expectedname.lower(): source = e.open(entry, "r") target = file(filename, "wb") with source, target: shutil.copyfileobj(source, target) return filename else: basename = os.path.basename(xorpad) if basename.lower() == expectedname.lower() or \ basename.lower() == legacyname.lower(): return xorpad
def _update_new_data(self, off, data_to_insert): """ Update the _new_data string in the specified offset. The data to be inserted overwrites previous data and should be given as a list of values. :param off: start offset in _new_data to insert data into :param data_to_insert: data to insert to _new_data :return: void """ BITSStateFile._log_instance_message('updating new_data in offset %s' % hex(off)) self._new_data = override_data(self._new_data, off, data_to_insert) if _os_ver == 10: decoded_queue_footer = BITSStateFile.QUEUE_FOOTER_HEX[_os_ver].decode('hex') crc32_off = self._new_data.find(decoded_queue_footer) + len(decoded_queue_footer) crc32_value = struct.pack("i", crc32(self._new_data[:crc32_off])) self._new_data = override_data(self._new_data, crc32_off, crc32_value)
def crc32(self): if not self.isfile(): raise TypeError('cannot compute crc32, not a file: {}'.format(self.abspath())) else: try: with open(self.abspath(), 'rb') as buf: buf = "%08X" % (binascii.crc32(buf.read()) & 0xFFFFFFFF) return buf except FileNotFoundError: raise FileNotFoundError('failed to compute crc32 for: {}'.format(self.abspath())) except PermissionError: raise PermissionError('failed to compute crc32 for: {}'.format(self.abspath())) except: raise RuntimeError('failed to compute crc32 for: {}'.format(self.abspath()))
def compensate(buf, wanted): wanted ^= FINALXOR newBits = 0 for i in range(32): if newBits & 1: newBits >>= 1 newBits ^= CRCPOLY else: newBits >>= 1 if wanted & 1: newBits ^= CRCINV wanted >>= 1 newBits ^= crc32(buf) ^ FINALXOR return pack('<L', newBits)
def fix_header(self): """ Repairs the header values. """ # Update magic self.header.magic = self.header._MAGIC # Update system_data_size system_raw = self.system.dumps() self.header.system_data_size = len(system_raw) # Update user_data_size user_raw = self._user_raw self.header.user_data_size = len(user_raw) # Update crc32 header_raw = self.header.dumps(False) checksum = 0 checksum = crc32(header_raw, checksum) checksum = crc32(system_raw, checksum) checksum = crc32(user_raw, checksum) # Convert the checksum into 32-bit unsigned integer (for Python 2/3 compatibility) self.header.crc32 = (checksum & 0xffffffff)
def _read_comment_v3(self, inf, psw=None): # read data rf = XFile(inf.volume_file) rf.seek(inf.file_offset) data = rf.read(inf.compress_size) rf.close() # decompress cmt = rar_decompress(inf.extract_version, inf.compress_type, data, inf.file_size, inf.flags, inf.CRC, psw, inf.salt) # check crc if self._crc_check: crc = crc32(cmt) if crc < 0: crc += (1 << 32) if crc != inf.CRC: return None return self._decode_comment(cmt) # write in-memory archive to temp file - needed for solid archives
def read(self, cnt = None): """Read all or specified amount of data from archive entry.""" # sanitize cnt if cnt is None or cnt < 0: cnt = self.remain elif cnt > self.remain: cnt = self.remain if cnt == 0: return EMPTY # actual read data = self._read(cnt) if data: self.CRC = crc32(data, self.CRC) self.remain -= len(data) if len(data) != cnt: raise BadRarFile("Failed the read enough data") # done? if not data or self.remain == 0: #self.close() self._check() return data
def readinto(self, buf): """Zero-copy read directly into buffer.""" cnt = len(buf) if cnt > self.remain: cnt = self.remain vbuf = memoryview(buf) res = got = 0 while got < cnt: res = self.fd.readinto(vbuf[got : cnt]) if not res: break if self.crc_check: self.CRC = crc32(vbuf[got : got + res], self.CRC) self.remain -= res got += res return got
def readinto(self, buf): """Zero-copy read directly into buffer.""" got = 0 vbuf = memoryview(buf) while got < len(buf): # next vol needed? if self.cur_avail == 0: if not self._open_next(): break # length for next read cnt = len(buf) - got if cnt > self.cur_avail: cnt = self.cur_avail # read into temp view res = self.fd.readinto(vbuf[got : got + cnt]) if not res: break if self.crc_check: self.CRC = crc32(vbuf[got : got + res], self.CRC) self.cur_avail -= res self.remain -= res got += res return got
def dl_link(context, template='footer/dl_link.html'): request = context.get('request') if not request: return '' site = get_current_site(request) website = site.domain page_info = request.path_info set_crc = binascii.crc32(('set:%s:%s' % (website, page_info)).encode()) set_names = tuple(sorted(conf.DL_LINKS.keys())) url_set = conf.DL_LINKS[set_names[set_crc % len(set_names)]] url_crc = binascii.crc32(('url:%s:%s' % (website, page_info)).encode()) records = tuple(sorted(url_set, key=lambda x: x['url'])) record = records[url_crc % len(url_set)] return loader.render_to_string(template, record, request=context.get('request'))
def zip_to_file(file_path, destination): fd, zip_filename = tempfile.mkstemp(suffix=".zip", dir=destination) with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as myzip: if os.path.isdir(file_path): abs_src = os.path.abspath(file_path) for root, dirs, files in os.walk(file_path): for current_file in files: absname = os.path.abspath(os.path.join(root, current_file)) arcname = absname[len(abs_src) + 1:] myzip.write(absname, arcname) else: myzip.write(file_path, file_path) zip_info = ''.join(str(zipinfoi.CRC) for zipinfoi in myzip.infolist()) checksum = hex(binascii.crc32(zip_info) & 0xffffffff) return zip_filename, checksum
def test_returned_value(self): # Limit to the minimum of all limits (b2a_uu) MAX_ALL = 45 raw = self.rawdata[:MAX_ALL] for fa, fb in zip(a2b_functions, b2a_functions): a2b = getattr(binascii, fa) b2a = getattr(binascii, fb) try: a = b2a(self.type2test(raw)) res = a2b(self.type2test(a)) except Exception as err: self.fail("{}/{} conversion raises {!r}".format(fb, fa, err)) if fb == 'b2a_hqx': # b2a_hqx returns a tuple res, _ = res self.assertEqual(res, raw, "{}/{} conversion: " "{!r} != {!r}".format(fb, fa, res, raw)) self.assertIsInstance(res, bytes) self.assertIsInstance(a, bytes) self.assertLess(max(a), 128) self.assertIsInstance(binascii.crc_hqx(raw, 0), int) self.assertIsInstance(binascii.crc32(raw), int)
def test_extract(self): for v in (True, False): with TempDir() as tdir: extract(simple_rar, tdir, verify_data=v) h = { normalize(os.path.abspath(os.path.join(tdir, h['filename']))): h for h in headers(simple_rar)} data = {} for dirpath, dirnames, filenames in os.walk(tdir): for f in filenames: path = normalize(os.path.join(dirpath, f)) data[os.path.relpath(path, tdir).replace(os.sep, '/')] = d = open(path, 'rb').read() if f == 'one.txt': self.ae(os.path.getmtime(path), 1098472879) self.ae(h[path]['unpack_size'], len(d)) self.ae(h[path]['file_crc'] & 0xffffffff, crc32(d) & 0xffffffff) q = {k: v for k, v in sr_data.items() if v} del q['symlink'] self.ae(data, q)
def test_returned_value(self): # Limit to the minimum of all limits (b2a_uu) MAX_ALL = 45 raw = self.rawdata[:MAX_ALL] for fa, fb in zip(a2b_functions, b2a_functions): a2b = getattr(binascii, fa) b2a = getattr(binascii, fb) try: a = b2a(self.type2test(raw)) res = a2b(self.type2test(a)) except Exception, err: self.fail("{}/{} conversion raises {!r}".format(fb, fa, err)) if fb == 'b2a_hqx': # b2a_hqx returns a tuple res, _ = res self.assertEqual(res, raw, "{}/{} conversion: " "{!r} != {!r}".format(fb, fa, res, raw)) self.assertIsInstance(res, str) self.assertIsInstance(a, str) self.assertLess(max(ord(c) for c in a), 128) self.assertIsInstance(binascii.crc_hqx(raw, 0), int) self.assertIsInstance(binascii.crc32(raw), int)
def ChkMainCrc(filename): ''' check main CRC ''' data = open(filename,'rb').read() for i in range(0, len(data)-4): # these 4 bytes are the CRC embedded in the FirmwareUpdate.bin block = data[i:i+4] # calculate CRC for the rest of the data (replace the 4 bytes with 0's) c = (crc32(data[:i] + "\0"*4 + data[i+4:],0xFFFFFFFF)^0xFFFFFFFF) & 0xffffffff if pack("<I", c) in block: print "Found at offset dec=%d hex=%08X" % (i,i) print "CRC=%08X" % c break
def CalSectCrc(data): sections = unpack("<I", data[15:15+4])[0] ptr = 27 print "[+]Patching section CRC..." for i in range(sections): section = data[ptr:ptr+10] type, offset, size = unpack("<HII", section) # print "0x%04X: at offset 0x%08X, size 0x%08X [ends at 0x%08X]" %\ # (type, offset, size, offset+size) if type == 0xc002: print "\tUnknown header type C002 ignored..." ptr+=10 continue section_data = data[offset:offset+size] c = (crc32(section_data[:24] + "\0"*4 + section_data[28:], \ 0xFFFFFFFF)^0xFFFFFFFF) & 0xffffffff print "\tCalculated crc: %s" % ("0x%08X" % c) section_data = section_data[:24] + pack("<I", c) + section_data[28:] data = data[:offset] + section_data + data[offset+size:] ptr+=10 return data
def buildResponse(query, packetNumber): message = ('II II' '81 80' # TID, Flags '00 00' # question count '00 01' # answer count '00 00' # authority count '00 00' # additional count ) message = message.replace('II II', "%04x" % packetNumber) for chunk in query.split("."): message += ' %02x' % len(chunk) for c in chunk: message += ' %02x' % ord(c) message += ' 00' message += '00 01' # type A query message += '00 01' # Class IN #message += 'c0 0c 00 01 00 01' # answer header: type a, class IN message += "00 00 00 01" #message += '00 01' # ttl: 1 second? message += '00 04' # response length hostchunk = query.split('.')[0] respIP = binascii.crc32(hostchunk) & 0xffffffff message += "%08x" % respIP return message
def checksum(self, file): import binascii preprocess = self.compiler.preprocess_source(file) return binascii.crc32(preprocess.encode('utf-8'))
def _calc_crc(self, decoded): """ Calculate the CRC based on the decoded content passed in """ self._escape = crc32(decoded, self._escape) self._crc = (self._escape ^ -1)
def crc32(self): """ Returns the calculated crc32 string for the decoded data. """ return "%08x" % (self._crc ^ BIN_MASK)
def read_body(self, f, body_checksum): checksum = 0 data = f.read(32) while len(data) == 32: meta = dict() segment = "" segment, meta['incremental'], meta['base'], meta['encryption'], meta['compression'], sha1 = unpack('<2IH2B20s', data) meta['sha1_hash'] = b2a_hex(sha1) self.segments[segment] = meta checksum = crc32(data, checksum) data = f.read(32) if checksum != body_checksum: raise Exception('Body checksum does not match')
def update_fuzzer(self): resp = self.transport.perform_request(self.pb_request, self.get_params) data, text, url, mime = resp.content, resp.text, resp.url, resp.headers['Content-Type'].split(';')[0] meta = '%s %d %08x\n%s' % (mime, len(data), crc32(data) & 0xffffffff, resp.url) self.fuzzer.urlField.setText(meta) self.fuzzer.frame.update_frame(data, text, url, mime, getattr(self, 'pb_resp', None))
def __init__(self, fileobj, mode, zipinfo, decrypter=None): self._fileobj = fileobj self._decrypter = decrypter self._compress_type = zipinfo.compress_type self._compress_size = zipinfo.compress_size self._compress_left = zipinfo.compress_size if self._compress_type == ZIP_DEFLATED: self._decompressor = zlib.decompressobj(-15) self._unconsumed = '' self._readbuffer = '' self._offset = 0 self._universal = 'U' in mode self.newlines = None # Adjust read size for encrypted files since the first 12 bytes # are for the encryption/password information. if self._decrypter is not None: self._compress_left -= 12 self.mode = mode self.name = zipinfo.filename if hasattr(zipinfo, 'CRC'): self._expected_crc = zipinfo.CRC self._running_crc = crc32(b'') & 0xffffffff else: self._expected_crc = None
def _update_crc(self, newdata, eof): # Update the CRC using the given data. if self._expected_crc is None: # No need to compute the CRC if we don't have a reference value return self._running_crc = crc32(newdata, self._running_crc) & 0xffffffff # Check the CRC if we're at the end of the file if eof and self._running_crc != self._expected_crc: raise BadZipfile("Bad CRC-32 for file %r" % self.name)
def _get_record_metadata(self): return ConsumerRecord( topic=TOPIC_STATES, partition=0, offset=42, timestamp=1467649216540, timestamp_type=0, key=b'NY', value=b'foo', checksum=binascii.crc32(b'foo'), serialized_key_size=b'NY', serialized_value_size=b'foo')
def mock_consumer(self, KafkaConsumer, value, max_calls=1): # Mock a consumer object fake_kafka_consumer = MagicMock() # Should return a record when used as an iterator. Set up the mock to # return the record up to the limit of max_calls. Then raises StopIteration record = ConsumerRecord( topic=TOPIC_STATES, partition=0, offset=42, timestamp=1467649216540, timestamp_type=0, key=b'NY', value=value, checksum=binascii.crc32(value), serialized_key_size=b'NY', serialized_value_size=value) meta = { 'i': 0 } def _iter(*args, **kwargs): if meta['i'] >= max_calls: raise StopIteration() meta['i'] += 1 return record fake_kafka_consumer.__next__.side_effect = _iter # Return some partitions fake_kafka_consumer.partitions_for_topic.return_value = set([0, 1]) # Make class instantiation return our mock KafkaConsumer.return_value = fake_kafka_consumer return fake_kafka_consumer
def pack_data(self, buf): data = self.rnd_data(len(buf)) + buf data_len = len(data) + 8 crc = binascii.crc32(struct.pack('>H', data_len)) & 0xFFFF data = struct.pack('<H', crc) + data data = struct.pack('>H', data_len) + data adler32 = zlib.adler32(data) & 0xFFFFFFFF data += struct.pack('<I', adler32) return data
def pack_auth_data(self, buf): if len(buf) == 0: return b'' data = self.rnd_data(len(buf)) + buf data_len = len(data) + 16 crc = binascii.crc32(struct.pack('>H', data_len) + self.salt + self.server_info.key) & 0xFFFFFFFF data = struct.pack('<I', crc) + data data = struct.pack('>H', data_len) + data data += hmac.new(self.server_info.iv + self.server_info.key, data, hashlib.sha1).digest()[:10] return data
def client_post_decrypt(self, buf): if self.raw_trans: return buf self.recv_buf += buf out_buf = b'' while len(self.recv_buf) > 4: crc = struct.pack('<H', binascii.crc32(self.recv_buf[:2]) & 0xFFFF) if crc != self.recv_buf[2:4]: raise Exception('client_post_decrypt data uncorrect crc') length = struct.unpack('>H', self.recv_buf[:2])[0] if length >= 8192 or length < 7: self.raw_trans = True self.recv_buf = b'' raise Exception('client_post_decrypt data error') if length > len(self.recv_buf): break if struct.pack('<I', zlib.adler32(self.recv_buf[:length - 4]) & 0xFFFFFFFF) != self.recv_buf[length - 4:length]: self.raw_trans = True self.recv_buf = b'' raise Exception('client_post_decrypt data uncorrect checksum') pos = common.ord(self.recv_buf[4]) if pos < 255: pos += 4 else: pos = struct.unpack('>H', self.recv_buf[5:7])[0] + 4 out_buf += self.recv_buf[pos:length - 4] self.recv_buf = self.recv_buf[length:] if out_buf: self.decrypt_packet_num += 1 return out_buf
def client_encode(self, buf): if self.raw_trans_sent: return buf self.send_buffer += buf if not self.has_sent_header: self.has_sent_header = True data = os.urandom(common.ord(os.urandom(1)[0]) % 96 + 4) crc = (0xffffffff - binascii.crc32(data)) & 0xffffffff return data + struct.pack('<I', crc) if self.raw_trans_recv: ret = self.send_buffer self.send_buffer = b'' self.raw_trans_sent = True return ret return b''
def server_decode(self, buf): if self.has_recv_header: return (buf, True, False) self.has_recv_header = True crc = binascii.crc32(buf) & 0xffffffff if crc != 0xffffffff: self.has_sent_header = True if self.method == 'random_head': return (b'E'*2048, False, False) return (buf, True, False) # (buffer_to_recv, is_need_decrypt, is_need_to_encode_and_send_back) return (b'', False, True)
def __init__(self, ffi, preamble, tmpdir=None, modulename=None, ext_package=None, tag='', force_generic_engine=False, source_extension='.c', flags=None, relative_to=None, **kwds): if ffi._parser._uses_new_feature: raise ffiplatform.VerificationError( "feature not supported with ffi.verify(), but only " "with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,)) self.ffi = ffi self.preamble = preamble if not modulename: flattened_kwds = ffiplatform.flatten(kwds) vengine_class = _locate_engine_class(ffi, force_generic_engine) self._vengine = vengine_class(self) self._vengine.patch_extension_kwds(kwds) self.flags = flags self.kwds = self.make_relative_to(kwds, relative_to) # if modulename: if tag: raise TypeError("can't specify both 'modulename' and 'tag'") else: key = '\x00'.join([sys.version[:3], __version_verifier_modules__, preamble, flattened_kwds] + ffi._cdefsources) if sys.version_info >= (3,): key = key.encode('utf-8') k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff) k1 = k1.lstrip('0x').rstrip('L') k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff) k2 = k2.lstrip('0').rstrip('L') modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key, k1, k2) suffix = _get_so_suffixes()[0] self.tmpdir = tmpdir or _caller_dir_pycache() self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension) self.modulefilename = os.path.join(self.tmpdir, modulename + suffix) self.ext_package = ext_package self._has_source = False self._has_module = False
def _encode_service_payload(self, service_check): service_payload_list = [ self.protocol_version, 0, # Padding 0, # Placeholder for CRC service_check.timestamp, service_check.status, service_check.hostname.encode('utf8'), service_check.name.encode('utf8'), service_check.output.encode('utf8'), 0 # Padding ] crc = binascii.crc32(struct.pack(self.service_payload_fmt, *service_payload_list)) service_payload_list[2] = crc return struct.pack(self.service_payload_fmt, *service_payload_list)
def parse(self, data, offset=0): """ Parse the u-boot environment variables from bytearray. :param data: The data in bytes array :param offset: The offset of input data """ self._env = collections.OrderedDict() fmt = ">IB" if self._bigendian else "<IB" (read_crc, tmp) = struct.unpack_from(fmt, data, offset) if tmp == 0x01: self._redundant = True read_data = data[offset + 5:] else: self._redundant = False read_data = data[offset + 4:] calc_crc = binascii.crc32(read_data) & 0xffffffff if read_crc != calc_crc: raise ValueError("Wrong CRC") read_data = read_data.decode('utf-8') for s in read_data.split('\0'): if not s or s.startswith('\xFF') or s.startswith('\x00'): break key, value = s.split('=', 1) self._env[key] = value
def export(self): """ Export the u-boot environment variables into bytearray. :return The environment variables in bytearray """ env_size = self.size if self._redundant: env_size -= 5 else: env_size -= 4 data = str() for k in self._env: data += "{0:s}={1:s}".format(k, self._env[k]) data += "\0" # Termination of line "\0" data += "\0" # End of file "\0\0" if len(data) > env_size: raise Exception("ERROR: ENV size out of range, extend required size !") env_blob = data + chr(self._empty_value) * (env_size - len(data)) env_blob = env_blob.encode('utf-8') crc = binascii.crc32(env_blob) & 0xffffffff fmt = ">I" if self._bigendian else "<I" ret = struct.pack(fmt + "B", crc, 0x01) if self._redundant else struct.pack(fmt, crc) ret += env_blob return ret
def read(self, name): """Return file bytes (as a string) for name.""" f = self.readfile(name) zinfo = self.getinfo(name) bytes = f.read() crc = binascii.crc32(bytes) if crc != zinfo.CRC: raise zipfile.BadZipfile, "Bad CRC-32 for file %s" % name return bytes