我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bz2.BZ2Compressor()。
def init(self): import lzma self.pos = 0 if self.mode == "r": self.cmpobj = lzma.BZ2Decompressor() self.fileobj.seek(0) self.buf = "" else: self.cmpobj = lzma.BZ2Compressor() # class _XZProxy #------------------------ # Extraction file object #------------------------
def bz2_compress(self,file,type=True): # Compress/Decompress files into/from the bz2 format. compress if type else decompess if not os.path.exists(file) or os.path.isdir(file): return False try: filesize = os.path.getsize(file) except: return False if not type and not file.endswith(".bz2"): return False blocksize = 102400 if type: compressor = bz2.BZ2Compressor() else: decompressor = bz2.BZ2Decompressor() handle1 = open(file,"rb") handle2 = open(file+".bz2","wb") if type else open(file[:-4],"wb") for i in range(int(math.ceil(float(filesize)/blocksize))): if type: handle2.write(compressor.compress(handle1.read(blocksize))) else: handle2.write(decompressor.decompress(handle1.read(blocksize))) if type: handle2.write(compressor.flush()) handle1.close(); handle2.close() self.debug("Successfully "+("" if type else "de")+"compressed file : "+file) return True ################################################## Client Behaviour Functions ##################################################
def init(self): import bz2 self.pos = 0 if self.mode == "r": self.bz2obj = bz2.BZ2Decompressor() self.fileobj.seek(0) self.buf = b"" else: self.bz2obj = bz2.BZ2Compressor()
def init(self): import bz2 self.pos = 0 if self.mode == "r": self.bz2obj = bz2.BZ2Decompressor() self.fileobj.seek(0) self.buf = "" else: self.bz2obj = bz2.BZ2Compressor()
def format(self, tokensource, outfile): try: outfile.write(b'') except TypeError: raise TypeError('The raw tokens formatter needs a binary ' 'output file') if self.compress == 'gz': import gzip outfile = gzip.GzipFile('', 'wb', 9, outfile) def write(text): outfile.write(text.encode()) flush = outfile.flush elif self.compress == 'bz2': import bz2 compressor = bz2.BZ2Compressor(9) def write(text): outfile.write(compressor.compress(text.encode())) def flush(): outfile.write(compressor.flush()) outfile.flush() else: def write(text): outfile.write(text.encode()) flush = outfile.flush if self.error_color: for ttype, value in tokensource: line = "%s\t%r\n" % (ttype, value) if ttype is Token.Error: write(colorize(self.error_color, line)) else: write(line) else: for ttype, value in tokensource: write("%s\t%r\n" % (ttype, value)) flush()
def handle(self): compressor = bz2.BZ2Compressor() # Find out what file the client wants filename = self.request.recv(1024).decode('utf-8') self.logger.debug('client asked for: "%s"', filename) # Send chunks of the file as they are compressed with open(filename, 'rb') as input: while True: block = input.read(BLOCK_SIZE) if not block: break self.logger.debug('RAW %r', block) compressed = compressor.compress(block) if compressed: self.logger.debug( 'SENDING %r', binascii.hexlify(compressed)) self.request.send(compressed) else: self.logger.debug('BUFFERING') # Send any data being buffered by the compressor remaining = compressor.flush() while remaining: to_send = remaining[:BLOCK_SIZE] remaining = remaining[BLOCK_SIZE:] self.logger.debug('FLUSHING %r', binascii.hexlify(to_send)) self.request.send(to_send) return
def init(self): import bz2 self.pos = 0 if self.mode == "r": self.cmpobj = bz2.BZ2Decompressor() self.fileobj.seek(0) self.buf = "" else: self.cmpobj = bz2.BZ2Compressor() # class _BZ2Proxy
def format(self, tokensource, outfile): try: outfile.write(b('')) except TypeError: raise TypeError('The raw tokens formatter needs a binary ' 'output file') if self.compress == 'gz': import gzip outfile = gzip.GzipFile('', 'wb', 9, outfile) def write(text): outfile.write(text.encode()) flush = outfile.flush elif self.compress == 'bz2': import bz2 compressor = bz2.BZ2Compressor(9) def write(text): outfile.write(compressor.compress(text.encode())) def flush(): outfile.write(compressor.flush()) outfile.flush() else: def write(text): outfile.write(text.encode()) flush = outfile.flush if self.error_color: for ttype, value in tokensource: line = "%s\t%r\n" % (ttype, value) if ttype is Token.Error: write(colorize(self.error_color, line)) else: write(line) else: for ttype, value in tokensource: write("%s\t%r\n" % (ttype, value)) flush()