我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bz2.compress()。
def _test_partial_input(self, mode): class MyBytesIO(io.BytesIO): hit_eof = False def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in tarfile.open()") self.hit_eof = self.tell() == len(self.getvalue()) return super(MyBytesIO, self).read(n) def seek(self, *args): self.hit_eof = False return super(MyBytesIO, self).seek(*args) data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors
def encode(payload): """Encode a string for tranmission over the network using base64 encoding of the bz2 compressed string. We bz2 compress because we can and also to counteract the inefficiency of the base64 encoding. Args: payload(str): string we want to transmit over the wire Returns: str: base64 encoded, bz2 compressed string """ return base64.urlsafe_b64encode(bz2.compress(payload.encode('utf-8'))).decode('utf-8') # max is 1024**127
def _test_partial_input(self, mode): class MyStringIO(StringIO.StringIO): hit_eof = False def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in tarfile.open()") self.hit_eof = self.pos == self.len return StringIO.StringIO.read(self, n) def seek(self, *args): self.hit_eof = False return StringIO.StringIO.seek(self, *args) data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors
def _test_partial_input(self, mode): class MyBytesIO(io.BytesIO): hit_eof = False def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in " "tarfile.open()") self.hit_eof = self.tell() == len(self.getvalue()) return super(MyBytesIO, self).read(n) def seek(self, *args): self.hit_eof = False return super(MyBytesIO, self).seek(*args) data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors
def compress( self, message ) : """ This funtion performs all provided compression algorithm to the *message* parameter and decides which does the most efficient compression. It does so by comparing the output lengths. :param str message: The data to be compressed in raw bytes. :rtype: str :return: Data compressed by most efficient available algorithm. """ zips = [] for comp in self.comps : zfile = comp( message ) zips.append( zfile ) sorted_zips = sorted( zips, key = lambda tup:len( tup ) ) winner = sorted_zips[0] return winner
def _write_repodata(package_dir, repodata_dict): data = json.dumps(repodata_dict, indent=2, sort_keys=True) # strip trailing whitespace data = '\n'.join(line.rstrip() for line in data.splitlines()) # make sure we have newline at the end if not data.endswith('\n'): data += '\n' with open(os.path.join(package_dir, 'repodata.json'), 'w') as fo: fo.write(data) # compress repodata.json into the bz2 format. some conda commands still # need it bz2_path = os.path.join(package_dir, 'repodata.json.bz2') with open(bz2_path, 'wb') as fo: fo.write(bz2.compress(data.encode('utf-8')))
def load(cls, branch, kv_store, msg_cls, hash): # Update the branch's config store blob = kv_store[hash] if cls.compress: blob = decompress(blob) data = loads(blob) config_hash = data['config'] config_data = cls.load_config(kv_store, msg_cls, config_hash) children_list = data['children'] assembled_children = {} node = branch._node for field_name, meta in children_fields(msg_cls).iteritems(): child_msg_cls = tmp_cls_loader(meta.module, meta.type) children = [] for child_hash in children_list[field_name]: child_node = node._mknode(child_msg_cls) child_node.load_latest(child_hash) child_rev = child_node.latest children.append(child_rev) assembled_children[field_name] = children rev = cls(branch, config_data, assembled_children) return rev
def __init__(self, path="./libsimilarity/libsimilarity.so"): super(SIMILARITYNative, self).__init__(True) self._u = cdll.LoadLibrary( path ) self._u.compress.restype = c_uint self._u.ncd.restype = c_int self._u.ncs.restype = c_int self._u.cmid.restype = c_int self._u.entropy.restype = c_double self._u.levenshtein.restype = c_uint self._u.kolmogorov.restype = c_uint self._u.bennett.restype = c_double self._u.RDTSC.restype = c_double self.__libsim_t = LIBSIMILARITY_T() self.set_compress_type( ZLIB_COMPRESS )
def _ncd(self, s1, s2, s1size=0, s2size=0): if s1size == 0: s1size = self.compress(s1) if s2size == 0: s2size = self.compress(s2) s3size = self.compress(s1+s2) smax = max(s1size, s2size) smin = min(s1size, s2size) res = (abs(s3size - smin)) / float(smax) if res > 1.0: res = 1.0 return res, s1size, s2size, 0
def test_queueMessages_processes_files_message_instantly(self): worker = StatusWorkerService(sentinel.dbtasks) mock_processMessage = self.patch(worker, "_processMessage") contents = b'These are the contents of the file.' encoded_content = encode_as_base64(bz2.compress(contents)) message = self.make_message() message['files'] = [ { "path": "sample.txt", "encoding": "uuencode", "compression": "bzip2", "content": encoded_content } ] nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens) node, token = nodes_with_tokens[0] yield worker.queueMessage(token.key, message) self.assertThat( mock_processMessage, MockCalledOnceWith(node, message))
def test_queueMessages_handled_invalid_nodekey_with_instant_msg(self): worker = StatusWorkerService(sentinel.dbtasks) mock_processMessage = self.patch(worker, "_processMessage") contents = b'These are the contents of the file.' encoded_content = encode_as_base64(bz2.compress(contents)) message = self.make_message() message['files'] = [ { "path": "sample.txt", "encoding": "uuencode", "compression": "bzip2", "content": encoded_content } ] nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens) node, token = nodes_with_tokens[0] yield deferToDatabase(token.delete) yield worker.queueMessage(token.key, message) self.assertThat( mock_processMessage, MockNotCalled())
def test_status_with_file_bad_encoder_fails(self): node = factory.make_Node( interface=True, status=NODE_STATUS.COMMISSIONING) contents = b'These are the contents of the file.' encoded_content = encode_as_base64(bz2.compress(contents)) payload = { 'event_type': 'finish', 'result': 'FAILURE', 'origin': 'curtin', 'name': 'commissioning', 'description': 'Commissioning', 'timestamp': datetime.utcnow(), 'files': [ { "path": "sample.txt", "encoding": "uuencode", "compression": "bzip2", "content": encoded_content } ] } with ExpectedException(ValueError): self.processMessage(node, payload)
def test_status_with_file_bad_compression_fails(self): node = factory.make_Node( interface=True, status=NODE_STATUS.COMMISSIONING) contents = b'These are the contents of the file.' encoded_content = encode_as_base64(bz2.compress(contents)) payload = { 'event_type': 'finish', 'result': 'FAILURE', 'origin': 'curtin', 'name': 'commissioning', 'description': 'Commissioning', 'timestamp': datetime.utcnow(), 'files': [ { "path": "sample.txt", "encoding": "base64", "compression": "jpeg", "content": encoded_content } ] } with ExpectedException(ValueError): self.processMessage(node, payload)
def accept (server, mask): global poll, cnt if mask & selectors.EVENT_READ: while True: try: conn, addr = sock.accept () print("Connected: " + str(addr)) v["id"] = cnt # conn.send(compress(bytes(json.dumps(v), 'utf-8'))) conn.send(bytes(json.dumps(v), 'utf-8')) except BlockingIOError: break conn.setblocking (False) poll.register (conn, selectors.EVENT_READ, read) clients[conn] = cnt #FIXED THIS cnt += 1 #cnt = random.randint(1, 10**40) mask &=~ selectors.EVENT_READ assert mask == 0
def sendMap(id, data): global localServer data = json.dumps(data) # if DEBUG_PROTOCOL_PRINT: # print(data) q = clients for x in q: try: if (q[x] == id): # x.send(compress(bytes(data + '\n', 'utf-8'))) x.send(bytes(data + '\n', 'utf-8')) break except: clientsLock.acquire() if x in clients: print("deleting user " + str(clients[x])) poll.unregister(x) x.close() del clients[x] localServer.UserExit(id) clientsLock.release() break
def registerMe(name): global sock, tcp_ip, tcp_port inf = open('config.txt', 'r') config = inf.readline() tcp_ip, tcp_port = config.split(' ') tcp_port = int(tcp_port) sock.connect((tcp_ip, tcp_port)) data = sock.recv(MAX_LENGTH) # id = json.loads(str(decompress(data), 'utf-8'))['id'] id = json.loads(str(data, 'utf-8'))['id'] jdata = dict() jdata['name'] = name s = json.dumps(jdata) # sock.send(compress(bytes(s + '\n', 'utf-8'))) sock.send(bytes(s + '\n', 'utf-8')) return id
def save(self, path=None): "Saves the database to path or most recently known path." if path is None: assert self.__path is not None, 'Path must be provided!' path = self.__path with open(path, 'wb') as file: file.write(bz2.compress(pickle.dumps(self))) self.__path = path
def bz2_pack(source): "Returns 'source' as a bzip2-compressed, self-extracting python script." import bz2, base64 out = "" compressed_source = bz2.compress(source) out += 'import bz2, base64\n' out += "exec bz2.decompress(base64.b64decode('" out += base64.b64encode((compressed_source)) out += "'))\n" return out
def gz_pack(source): "Returns 'source' as a gzip-compressed, self-extracting python script." import zlib, base64 out = "" compressed_source = zlib.compress(source) out += 'import zlib, base64\n' out += "exec zlib.decompress(base64.b64decode('" out += base64.b64encode((compressed_source)) out += "'))\n" return out # The test.+() functions below are for testing pyminifer...
def bz2_pack(source): """ Returns 'source' as a bzip2-compressed, self-extracting python script. .. note:: This method uses up more space than the zip_pack method but it has the advantage in that the resulting .py file can still be imported into a python program. """ import bz2, base64 out = "" # Preserve shebangs (don't care about encodings for this) first_line = source.split('\n')[0] if analyze.shebang.match(first_line): if py3: if first_line.rstrip().endswith('python'): # Make it python3 first_line = first_line.rstrip() first_line += '3' #!/usr/bin/env python3 out = first_line + '\n' compressed_source = bz2.compress(source.encode('utf-8')) out += 'import bz2, base64\n' out += "exec(bz2.decompress(base64.b64decode('" out += base64.b64encode(compressed_source).decode('utf-8') out += "')))\n" return out
def gz_pack(source): """ Returns 'source' as a gzip-compressed, self-extracting python script. .. note:: This method uses up more space than the zip_pack method but it has the advantage in that the resulting .py file can still be imported into a python program. """ import zlib, base64 out = "" # Preserve shebangs (don't care about encodings for this) first_line = source.split('\n')[0] if analyze.shebang.match(first_line): if py3: if first_line.rstrip().endswith('python'): # Make it python3 first_line = first_line.rstrip() first_line += '3' #!/usr/bin/env python3 out = first_line + '\n' compressed_source = zlib.compress(source.encode('utf-8')) out += 'import zlib, base64\n' out += "exec(zlib.decompress(base64.b64decode('" out += base64.b64encode(compressed_source).decode('utf-8') out += "')))\n" return out
def lzma_pack(source): """ Returns 'source' as a lzma-compressed, self-extracting python script. .. note:: This method uses up more space than the zip_pack method but it has the advantage in that the resulting .py file can still be imported into a python program. """ import lzma, base64 out = "" # Preserve shebangs (don't care about encodings for this) first_line = source.split('\n')[0] if analyze.shebang.match(first_line): if py3: if first_line.rstrip().endswith('python'): # Make it python3 first_line = first_line.rstrip() first_line += '3' #!/usr/bin/env python3 out = first_line + '\n' compressed_source = lzma.compress(source.encode('utf-8')) out += 'import lzma, base64\n' out += "exec(lzma.decompress(base64.b64decode('" out += base64.b64encode(compressed_source).decode('utf-8') out += "')))\n" return out
def compress(data): return data
def compress(data, *args, **kwargs): return compresso.compresso.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): return neuroglancer.neuroglancer.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): return bz2.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): if type(data) is np.ndarray: str_data = data.tobytes() elif type(data) is str: str_data = data else: raise ValueError('Data type not supported') dictionary = lz78.lz78.compress(str_data, *args, **kwargs) array = np.zeros(len(dictionary), dtype=np.uint32) retry = False for ie, entry in enumerate(dictionary): if entry[1] == '': if (entry[0] >= 2**24): retry = True break array[ie] = (entry[0] << 8) else: if (entry[0] >= 2**24): retry = True break array[ie] = (entry[0] << 8) + ord(entry[1]) if not retry: return array else: array = np.zeros(len(dictionary), dtype=np.uint64) for ie, entry in enumerate(dictionary): if entry[1] == '': array[ie] = (entry[0] << 8) else: array[ie] = (entry[0] << 8) + ord(entry[1]) return array
def compress(data, *args, **kwargs): return lzma.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): return lzo.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): if type(data) is np.ndarray: str_data = data.tobytes() elif type(data) is str: str_data = data else: raise ValueError('Data type not supported') # create an empty dictionary dict_size = 2**8 dictionary = dict((chr(i), i) for i in xrange(dict_size)) w = '' result = [] for c in str_data: wc = w + c if wc in dictionary: w = wc else: result.append(dictionary[w]) dictionary[wc] = dict_size dict_size += 1 w = c if w: result.append(dictionary[w]) return np.array(result, dtype=np.uint32)
def compress(data, *args, **kwargs): return zlib.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): return zstd.compress(data, *args, **kwargs)
def compress(data, *args, **kwargs): return _png.compress(data)
def compress(data, *args, **kwargs): return x264.compress(data)
def string(self): # type: (InternalMessage) -> bytes """Returns a :py:class:`bytes` representation of the message Raises: TypeError: See :py:func:`~py2p.base.InternalMessage._InternalMessage__non_len_string` """ if not all((self.__id, self.__string, self.__full_string)): id_ = self.id ret = b''.join((id_, self.__non_len_string)) compression_used = self.compression_used if compression_used: ret = compress(ret, compression_used) self.__full_string = b''.join((pack_value(4, len(ret)), ret)) return self.__full_string
def test_incremental_decompress(): basic_test_d(bz2.Decompressor(), compress)
def compress( data, ): compressed_object = bz2.compress(data) return compressed_object
def _contents(self, value): import bz2 self._contents = bz2.compress(value, 1)
def decode(payload_str): """Decode base64 encoded bz2 compress network payload Args: payload_str(str): base64 encoded bz2 compressed byte stream Returns: str: original byte-stream """ return bz2.decompress(base64.urlsafe_b64decode(payload_str.encode('utf-8')).decode('utf-8'))
def task_compress_zlib(): """zlib compression (C)""" import zlib with open(__file__, "rb") as f: arg = f.read(5000) * 3 def compress(s): zlib.decompress(zlib.compress(s, 5)) return compress, (arg, )
def task_compress_bz2(): """bz2 compression (C)""" import bz2 with open(__file__, "rb") as f: arg = f.read(3000) * 2 def compress(s): bz2.compress(s) return compress, (arg, )