我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bz2.decompress()。
def get_tokens(self, text): if isinstance(text, text_type): # raw token stream never has any non-ASCII characters text = text.encode('ascii') if self.compress == 'gz': import gzip gzipfile = gzip.GzipFile('', 'rb', 9, BytesIO(text)) text = gzipfile.read() elif self.compress == 'bz2': import bz2 text = bz2.decompress(text) # do not call Lexer.get_tokens() because we do not want Unicode # decoding to occur, and stripping is not optional. text = text.strip(b'\n') + b'\n' for i, t, v in self.get_tokens_unprocessed(text): yield t, v
def get_tokens(self, text): if isinstance(text, unicode): # raw token stream never has any non-ASCII characters text = text.encode('ascii') if self.compress == 'gz': import gzip gzipfile = gzip.GzipFile('', 'rb', 9, cStringIO.StringIO(text)) text = gzipfile.read() elif self.compress == 'bz2': import bz2 text = bz2.decompress(text) # do not call Lexer.get_tokens() because we do not want Unicode # decoding to occur, and stripping is not optional. text = text.strip(b('\n')) + b('\n') for i, t, v in self.get_tokens_unprocessed(text): yield t, v
def decompress( self, zipped ) : """ This funtion performs all provided decompression algorithm to the provided data. Based on the assumption that any decompression algorithm raises an Exception if the compressed data is not compatible, it finds the used compression algorithm and returns the decompressed data. :param str message: The data to be compressed in raw bytes. :rtype: str :return: Data compressed by most efficient available algorithm. """ plain = zipped for decomp in self.decomps : try : unzipped = decomp( zipped ) return unzipped except : pass return plain
def load(cls, branch, kv_store, msg_cls, hash): # Update the branch's config store blob = kv_store[hash] if cls.compress: blob = decompress(blob) data = loads(blob) config_hash = data['config'] config_data = cls.load_config(kv_store, msg_cls, config_hash) children_list = data['children'] assembled_children = {} node = branch._node for field_name, meta in children_fields(msg_cls).iteritems(): child_msg_cls = tmp_cls_loader(meta.module, meta.type) children = [] for child_hash in children_list[field_name]: child_node = node._mknode(child_msg_cls) child_node.load_latest(child_hash) child_rev = child_node.latest children.append(child_rev) assembled_children[field_name] = children rev = cls(branch, config_data, assembled_children) return rev
def import_to_store(self, compressed_nar): """Given a compressed NAR, extract it and import it into the nix store. :param compressed_nar: The bytes of a NAR, compressed. :type compressed_nar: ``str`` """ # Figure out how to extract the content. if self.compression.lower() in ("xz", "xzip"): data = lzma.decompress(compressed_nar) elif self.compression.lower() in ("bz2", "bzip2"): data = bz2.decompress(compressed_nar) else: data = gzip.decompress(compressed_nar) # Once extracted, convert it into a nix export object and import. export = self.nar_to_export(data) imported_path = export.import_to_store()
def _retrieve_content(self, compression, encoding, content): """Extract the content of the sent file.""" # Select the appropriate decompressor. if compression is None: decompress = lambda s: s elif compression == 'bzip2': decompress = bz2.decompress else: raise ValueError('Invalid compression: %s' % compression) # Select the appropriate decoder. if encoding == 'base64': decode = base64.decodebytes else: raise ValueError('Invalid encoding: %s' % encoding) return decompress(decode(content.encode("ascii")))
def registerMe(name): global sock, tcp_ip, tcp_port inf = open('config.txt', 'r') config = inf.readline() tcp_ip, tcp_port = config.split(' ') tcp_port = int(tcp_port) sock.connect((tcp_ip, tcp_port)) data = sock.recv(MAX_LENGTH) # id = json.loads(str(decompress(data), 'utf-8'))['id'] id = json.loads(str(data, 'utf-8'))['id'] jdata = dict() jdata['name'] = name s = json.dumps(jdata) # sock.send(compress(bytes(s + '\n', 'utf-8'))) sock.send(bytes(s + '\n', 'utf-8')) return id
def load(cls, path): "Loads database from path and tests identity." with open(path, 'rb') as file: obj = pickle.loads(bz2.decompress(file.read())) assert isinstance(obj, cls), 'Could not load a database object!' obj.__path = path return obj ########################################################################
def bz2_pack(source): "Returns 'source' as a bzip2-compressed, self-extracting python script." import bz2, base64 out = "" compressed_source = bz2.compress(source) out += 'import bz2, base64\n' out += "exec bz2.decompress(base64.b64decode('" out += base64.b64encode((compressed_source)) out += "'))\n" return out
def gz_pack(source): "Returns 'source' as a gzip-compressed, self-extracting python script." import zlib, base64 out = "" compressed_source = zlib.compress(source) out += 'import zlib, base64\n' out += "exec zlib.decompress(base64.b64decode('" out += base64.b64encode((compressed_source)) out += "'))\n" return out # The test.+() functions below are for testing pyminifer...
def _lzma(self): '''LZMA processor''' try: archive = lzma.decompress(self.cur_attachment.file_obj.read()) new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename) cur_file = File(archive, new_fn) self.process_payload(cur_file) except: self.cur_attachment.make_dangerous() return self.cur_attachment
def _bzip(self): '''BZip2 processor''' try: archive = bz2.decompress(self.cur_attachment.file_obj.read()) new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename) cur_file = File(archive, new_fn) self.process_payload(cur_file) except: self.cur_attachment.make_dangerous() return self.cur_attachment
def bz2_pack(source): """ Returns 'source' as a bzip2-compressed, self-extracting python script. .. note:: This method uses up more space than the zip_pack method but it has the advantage in that the resulting .py file can still be imported into a python program. """ import bz2, base64 out = "" # Preserve shebangs (don't care about encodings for this) first_line = source.split('\n')[0] if analyze.shebang.match(first_line): if py3: if first_line.rstrip().endswith('python'): # Make it python3 first_line = first_line.rstrip() first_line += '3' #!/usr/bin/env python3 out = first_line + '\n' compressed_source = bz2.compress(source.encode('utf-8')) out += 'import bz2, base64\n' out += "exec(bz2.decompress(base64.b64decode('" out += base64.b64encode(compressed_source).decode('utf-8') out += "')))\n" return out
def gz_pack(source): """ Returns 'source' as a gzip-compressed, self-extracting python script. .. note:: This method uses up more space than the zip_pack method but it has the advantage in that the resulting .py file can still be imported into a python program. """ import zlib, base64 out = "" # Preserve shebangs (don't care about encodings for this) first_line = source.split('\n')[0] if analyze.shebang.match(first_line): if py3: if first_line.rstrip().endswith('python'): # Make it python3 first_line = first_line.rstrip() first_line += '3' #!/usr/bin/env python3 out = first_line + '\n' compressed_source = zlib.compress(source.encode('utf-8')) out += 'import zlib, base64\n' out += "exec(zlib.decompress(base64.b64decode('" out += base64.b64encode(compressed_source).decode('utf-8') out += "')))\n" return out
def lzma_pack(source): """ Returns 'source' as a lzma-compressed, self-extracting python script. .. note:: This method uses up more space than the zip_pack method but it has the advantage in that the resulting .py file can still be imported into a python program. """ import lzma, base64 out = "" # Preserve shebangs (don't care about encodings for this) first_line = source.split('\n')[0] if analyze.shebang.match(first_line): if py3: if first_line.rstrip().endswith('python'): # Make it python3 first_line = first_line.rstrip() first_line += '3' #!/usr/bin/env python3 out = first_line + '\n' compressed_source = lzma.compress(source.encode('utf-8')) out += 'import lzma, base64\n' out += "exec(lzma.decompress(base64.b64decode('" out += base64.b64encode(compressed_source).decode('utf-8') out += "')))\n" return out
def decompress(data): return data ############################# ### SEGMENTATION SPECIFIC ### #############################
def decompress(data, *args, **kwargs): return compresso.compresso.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs): return neuroglancer.neuroglancer.decompress(data, *args, **kwargs) ####################### ### GENERAL PURPOSE ### #######################
def decompress(data, *args, **kwargs): return bz2.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs): dictionary = list() for ie, entry in enumerate(data): int_value = long(entry) / (2**8) if ie == data.size - 1: char_value = '' else: char_value = chr(long(entry) % (2**8)) dictionary.append((int_value, char_value)) return lz78.lz78.decompress(dictionary, *args, **kwargs)
def decompress(data, *args, **kwargs): return lzma.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs): return lzo.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs): from cStringIO import StringIO data = list(data) dict_size = 256 dictionary = dict((i, chr(i)) for i in xrange(dict_size)) result = StringIO() w = chr(data.pop(0)) result.write(w) for k in data: if k in dictionary: entry = dictionary[k] elif k == dict_size: entry = w + w[0] else: raise ValueError('Bad compressed k: %s' % k) result.write(entry) # Add w+entry[0] to the dictionary dictionary[dict_size] = str(w + entry[0]) dict_size += 1 w = entry return result.getvalue()
def decompress(data, *args, **kwargs): return zlib.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs): return zstd.decompress(data, *args, **kwargs) ######################### ### IMAGE COMPRESSION ### #########################
def decompress(data, *args, **kwargs): return _png.decompress(data) ######################### ### VIDEO COMPRESSION ### #########################
def decompress(data, *args, **kwargs): return x264.decompress(data)
def get_local(self): return bz2.decompress(base64.decodestring(self.local))
def __decompress_string(cls, string, compressions=None): # type: (Any, bytes, Union[None, Iterable[int]]) -> Tuple[bytes, bool] """Returns a tuple containing the decompressed :py:class:`bytes` and a :py:class:`bool` as to whether decompression failed or not Args: string: The possibly-compressed message you wish to parse compressions: A list of the standard compression methods this message may be under (default: ``[]``) Returns: A decompressed version of the message Raises: ValueError: Unrecognized compression method fed in compressions Warning: Do not feed it with the size header, it will throw errors """ compression_fail = False # second is module scope compression for method in intersect(compressions, compression): try: string = decompress(string, method) compression_fail = False break except: compression_fail = True continue return (string, compression_fail)
def test_incremental_compress(): basic_test_c(bz2.Compressor(), decompress)
def getFile(cls, getfile, unpack=True): if cls.getProxy(): proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()}) auth = req.HTTPBasicAuthHandler() opener = req.build_opener(proxy, auth, req.HTTPHandler) req.install_opener(opener) try: response = req.urlopen(getfile) except: msg = "[!] Could not fetch file %s"%getfile if cls.exitWhenNoSource(): sys.exit(msg) else: print(msg) data = None data = response.read() # TODO: if data == text/plain; charset=utf-8, read and decode if unpack: if 'gzip' in response.info().get('Content-Type'): data = gzip.GzipFile(fileobj = BytesIO(data)) elif 'bzip2' in response.info().get('Content-Type'): data = BytesIO(bz2.decompress(data)) elif 'zip' in response.info().get('Content-Type'): fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) # In case the webserver is being generic elif 'application/octet-stream' in response.info().get('Content-Type'): if data[:4] == b'PK\x03\x04': # Zip fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) return (data, response)
def download_bzip2(path, url): if os.path.exists(path): return print("Downloading {0} from {1}...".format(path, url)) response = urlopen(url) with open(path, 'wb') as f: f.write(bz2.decompress(response.read()))
def decompress( data, ): decompressed_object = bz2.decompress(data) return decompressed_object
def _contents(self): import bz2 return bz2.decompress(self._contents)
def gather(): url_regex = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*' base_url = "http://data.phishtank.com/data/online-valid.csv.bz2" attack_type = "undefined" res = get_url(base_url) results = bz2.decompress(res.content) for line in results.split("\n")[1:]: if line == "": continue line = line.split(",") site_url = line[1] m = re.search(url_regex, site_url) host = m.group('host') ip_address = get_ip(host) if ip_address == "undefined": who_is, country = "undefined", "undefined" else: who_is, country = get_who_is_and_country(ip_address) doc = { 'IP': ip_address, 'SourceInfo': base_url, 'Type': attack_type, 'Country': country, 'Domain': host, 'URL': host, 'WhoIsInfo': who_is, } pprint(doc)