Python numpy 模块,packbits() 实例源码
我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用numpy.packbits()。
def update(self):
self._display_init()
x1, x2 = self.update_x1, self.update_x2
y1, y2 = self.update_y1, self.update_y2
region = self.buffer[y1:y2, x1:x2]
if self.v_flip:
region = numpy.fliplr(region)
if self.h_flip:
region = numpy.flipud(region)
buf_red = numpy.packbits(numpy.where(region == RED, 1, 0)).tolist()
if self.inky_version == 1:
buf_black = numpy.packbits(numpy.where(region == 0, 0, 1)).tolist()
else:
buf_black = numpy.packbits(numpy.where(region == BLACK, 0, 1)).tolist()
self._display_update(buf_black, buf_red)
self._display_fini()
def hsh_to_patches(self, hsh):
pp = extract_patches_2d(hsh.astype(int), (self.patch_size, self.patch_size))
# flatten 2nd and 3rd dimension:
pp = pp.reshape((pp.shape[0], -1))
# extract sample of patches:
max_patches = min(self.max_patches, pp.shape[0])
rr = [pp[x] for x in np.linspace(0, pp.shape[0], max_patches, endpoint=False).astype(int)]
# pack patches into numbers:
packed = [int(binascii.b2a_hex(''.join(np.packbits(x).view('c'))) or '0', 16) for x in rr]
return packed
def test_packbits_empty_with_axis():
# Original shapes and lists of packed shapes for different axes.
shapes = [
((0,), [(0,)]),
((10, 20, 0), [(2, 20, 0), (10, 3, 0), (10, 20, 0)]),
((10, 0, 20), [(2, 0, 20), (10, 0, 20), (10, 0, 3)]),
((0, 10, 20), [(0, 10, 20), (0, 2, 20), (0, 10, 3)]),
((20, 0, 0), [(3, 0, 0), (20, 0, 0), (20, 0, 0)]),
((0, 20, 0), [(0, 20, 0), (0, 3, 0), (0, 20, 0)]),
((0, 0, 20), [(0, 0, 20), (0, 0, 20), (0, 0, 3)]),
((0, 0, 0), [(0, 0, 0), (0, 0, 0), (0, 0, 0)]),
]
for dt in '?bBhHiIlLqQ':
for in_shape, out_shapes in shapes:
for ax, out_shape in enumerate(out_shapes):
a = np.empty(in_shape, dtype=dt)
b = np.packbits(a, axis=ax)
assert_equal(b.dtype, np.uint8)
assert_equal(b.shape, out_shape)
def test_packbits():
# Copied from the docstring.
a = [[[1, 0, 1], [0, 1, 0]],
[[1, 1, 0], [0, 0, 1]]]
for dtype in [np.bool, np.uint8, np.int]:
arr = np.array(a, dtype=dtype)
b = np.packbits(arr, axis=-1)
assert_equal(b.dtype, np.uint8)
assert_array_equal(b, np.array([[[160], [64]], [[192], [32]]]))
assert_raises(TypeError, np.packbits, np.array(a, dtype=float))
def decompress_rle(data, # type: bytes
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Decompress run length encoded data.
{}
"""
output = packbits.decode(
data, shape[0], shape[1], color_depth_size_map[depth], version)
# Now pass along to the raw decoder to get a Numpy array
return decompress_raw(output, shape, depth, version)
def decompress_zip_prediction(data, # type: bytes
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Decompress zip (zlib) with prediction encoded data.
Not supported for 1- or 32-bit images.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32:
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
decoder = packbits.decode_prediction_8bit
else:
decoder = packbits.decode_prediction_16bit
data = zlib.decompress(data)
image = util.ensure_native_endian(
decompress_raw(data, shape, depth, version))
for i in range(len(image)):
decoder(image[i].flatten())
return image
def normalize_image(image, # type: np.ndarray
depth # type: int
): # type: (...) -> np.ndarray
if depth == 1:
image = np.packbits(image.flatten())
return image
def compress_rle(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a run length encoded stream.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"rle compression is not supported for 1-bit images")
start = fd.tell()
if version == 1:
fd.seek(image.shape[0] * 2, 1)
lengths = np.empty((len(image),), dtype='>u2')
else:
fd.seek(image.shape[0] * 4, 1)
lengths = np.empty((len(image),), dtype='>u4')
if util.needs_byteswap(image):
for i, row in enumerate(image):
row = util.do_byteswap(row)
packed = packbits.encode(row)
lengths[i] = len(packed)
fd.write(packed)
else:
for i, row in enumerate(image):
packed = packbits.encode(row)
lengths[i] = len(packed)
fd.write(packed)
end = fd.tell()
fd.seek(start)
fd.write(lengths.tobytes())
fd.seek(end)
def compress_zip_prediction(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a zip (zlib) with prediction compressed
stream.
Not supported for 1- or 32-bit images.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32: # pragma: no cover
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
encoder = packbits.encode_prediction_8bit
elif depth == 16:
encoder = packbits.encode_prediction_16bit
compressor = zlib.compressobj()
for row in image:
encoder(row.flatten())
row = util.ensure_bigendian(row)
fd.write(compressor.compress(row))
fd.write(compressor.flush())
def compress_constant_zip_prediction(fd, # type: BinaryIO
value, # type: int
width, # type: int
rows, # type: int
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Write a virtual image containing a constant to a zip with
prediction compressed stream.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32: # pragma: no cover
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
encoder = packbits.encode_prediction_8bit
elif depth == 16:
encoder = packbits.encode_prediction_16bit
row = _make_constant_row(value, width, depth)
row = row.reshape((1, width))
row = util.ensure_native_endian(row)
encoder(row.flatten())
row = util.ensure_bigendian(row)
row = row.tobytes()
fd.write(zlib.compress(row * rows))
def write(self, filename):
header_bytes = struct.pack(CHUNK_HEADER_FORMAT, self.data_size, self.board_size, self.input_planes, self.is_test)
position_bytes = np.packbits(self.pos_features).tostring()
next_move_bytes = np.packbits(self.next_moves).tostring()
with gzip.open(filename, "wb", compresslevel=6) as f:
f.write(header_bytes)
f.write(position_bytes)
f.write(next_move_bytes)
def frame_1090es_ppm_modulate(even, odd):
ppm = [ ]
for i in range(48): # pause
ppm.append( 0 )
ppm.append( 0xA1 ) # preamble
ppm.append( 0x40 )
for i in range(len(even)):
word16 = numpy.packbits(manchester_encode(~even[i]))
ppm.append(word16[0])
ppm.append(word16[1])
for i in range(100): # pause
ppm.append( 0 )
ppm.append( 0xA1 ) # preamble
ppm.append( 0x40 )
for i in range(len(odd)):
word16 = numpy.packbits(manchester_encode(~odd[i]))
ppm.append(word16[0])
ppm.append(word16[1])
for i in range(48): # pause
ppm.append( 0 )
#print '[{}]'.format(', '.join(hex(x) for x in ppm))
return bytearray(ppm)
def pack(a):
"""Pack a boolean array *a* so that it takes 8x less space."""
return np.packbits(a.view(np.uint8))
def test_packbits():
# Copied from the docstring.
a = [[[1, 0, 1], [0, 1, 0]],
[[1, 1, 0], [0, 0, 1]]]
for dtype in [np.bool, np.uint8, np.int]:
arr = np.array(a, dtype=dtype)
b = np.packbits(arr, axis=-1)
assert_equal(b.dtype, np.uint8)
assert_array_equal(b, np.array([[[160], [64]], [[192], [32]]]))
assert_raises(TypeError, np.packbits, np.array(a, dtype=float))
def boolMasksToImage(masks):
'''
Transform at maximum 8 bool layers --> 2d arrays, dtype=(bool,int)
to one 8bit image
'''
assert len(masks) <= 8, 'can only transform up to 8 masks into image'
masks = np.asarray(masks, dtype=np.uint8)
assert masks.ndim == 3, 'layers need to be stack of 2d arrays'
return np.packbits(masks, axis=0)[0].T
def img_to_terms(self, img_data_uri = False, img_bytes = False):
if img_data_uri:
if type(img_data_uri) is unicode:
img_data_uri = img_data_uri.encode('utf8')
img = Image.open(StringIO(decode_image(img_data_uri)))
else:
assert img_bytes is not False
img = Image.open(StringIO(img_bytes))
hsh = binascii.b2a_hex(np.packbits(self.hash_func(img, hash_size = self.hash_size).hash).tobytes())
return {'dedupe_hsh': hsh}
def lsb_decode(image):
try:
red, green, blue = image.split()
watermark = ImageMath.eval("(a&0x1)*0x01", a=red)
watermark = watermark.convert('L')
watermark_bytes = bytes(watermark.getdata())
watermark_bits_array = np.fromiter(watermark_bytes, dtype=np.uint8)
watermark_bytes_array = np.packbits(watermark_bits_array)
watermark_bytes = bytes(watermark_bytes_array)
bytes_io = BytesIO(watermark_bytes)
return load(bytes_io)
except UnpicklingError:
return ''
def test_packbits():
# Copied from the docstring.
a = [[[1, 0, 1], [0, 1, 0]],
[[1, 1, 0], [0, 0, 1]]]
for dtype in [np.bool, np.uint8, np.int]:
arr = np.array(a, dtype=dtype)
b = np.packbits(arr, axis=-1)
assert_equal(b.dtype, np.uint8)
assert_array_equal(b, np.array([[[160], [64]], [[192], [32]]]))
assert_raises(TypeError, np.packbits, np.array(a, dtype=float))
def test_packbits():
# Copied from the docstring.
a = [[[1, 0, 1], [0, 1, 0]],
[[1, 1, 0], [0, 0, 1]]]
for dtype in [np.bool, np.uint8, np.int]:
arr = np.array(a, dtype=dtype)
b = np.packbits(arr, axis=-1)
assert_equal(b.dtype, np.uint8)
assert_array_equal(b, np.array([[[160], [64]], [[192], [32]]]))
assert_raises(TypeError, np.packbits, np.array(a, dtype=float))
def addition_problem(ReccurentLayer):
X_train, X_test, y_train, y_test = addition_dataset(8, 5000)
print(X_train.shape, X_test.shape)
model = NeuralNet(
layers=[
ReccurentLayer,
TimeDistributedDense(1),
Activation('sigmoid'),
],
loss='mse',
optimizer=Adam(),
metric='mse',
batch_size=64,
max_epochs=15,
)
model.fit(X_train, y_train)
predictions = np.round(model.predict(X_test))
predictions = np.packbits(predictions.astype(np.uint8))
y_test = np.packbits(y_test.astype(np.int))
print(accuracy(y_test, predictions))
# RNN
# addition_problem(RNN(16, parameters=Parameters(constraints={'W': SmallNorm(), 'U': SmallNorm()})))
# LSTM
def read_all_games(fn_in, fn_out):
g = h5py.File(fn_out, 'w')
X = g.create_dataset('x', (0, 28 * 8), dtype='b', maxshape=(None, 28 * 8), chunks=True) # dtype='b'
M = g.create_dataset('m', (0, 1), dtype='float32', maxshape=(None, 1), chunks=True)
size = 0
line = 0
for game in read_games(fn_in):
game = parse_game(game)
if game is None:
continue
for x, m in game :
if line + 1 >= size:
g.flush()
size = 2 * size + 1
print 'resizing to', size
[d.resize(size=size, axis=0) for d in (X, M)]
X[line] = numpy.packbits(x)
M[line] = m
line += 1
print 'shrink to', line
[d.resize(size=line, axis=0) for d in (X, M)] # shrink to fit
g.close()
def isNear(self, address):
difference = 0
# a logically anded array ([true, false, ..]) multiplied by an integer is promoted to an int array.
# Which then can be sent to packbits, that here returns an array of 1 element, so now z is an integer
z = np.packbits(np.logical_and(self.address, address)*1)[0]
while z:
difference += 1
z &= z-1 # magic!
return 1 if difference < self.radius else 0
def test_packbits():
# Copied from the docstring.
a = [[[1, 0, 1], [0, 1, 0]],
[[1, 1, 0], [0, 0, 1]]]
for dt in '?bBhHiIlLqQ':
arr = np.array(a, dtype=dt)
b = np.packbits(arr, axis=-1)
assert_equal(b.dtype, np.uint8)
assert_array_equal(b, np.array([[[160], [64]], [[192], [32]]]))
assert_raises(TypeError, np.packbits, np.array(a, dtype=float))
def test_packbits_empty():
shapes = [
(0,), (10, 20, 0), (10, 0, 20), (0, 10, 20), (20, 0, 0), (0, 20, 0),
(0, 0, 20), (0, 0, 0),
]
for dt in '?bBhHiIlLqQ':
for shape in shapes:
a = np.empty(shape, dtype=dt)
b = np.packbits(a)
assert_equal(b.dtype, np.uint8)
assert_equal(b.shape, (0,))
def prepare_packed(data, output_fname):
print("prepare packed:")
with tictoc():
with tf.python_io.TFRecordWriter(output_fname) as writer:
for atom_features in data:
value = np.packbits(atom_features.reshape(-1)).tobytes()
bit_features = tf.train.Feature(bytes_list = tf.train.BytesList(value=value))
features = {'bit_features' : bit_features}
example = tf.train.Example(features=tf.train.Features(feature=features))
serialized_example = example.SerializeToString()
writer.write(serialized_example)
print("File size: {} bytes".format(os.path.getsize(output_fname)))
def pack_state(self, state):
black = np.packbits(state == Board.STONE_BLACK)
white = np.packbits(state == Board.STONE_WHITE)
empty = np.packbits(state == Board.STONE_EMPTY)
image = np.concatenate((black, white, empty))
return bytes(image)
def _tofile(self, fh, pam=False):
"""Write Netbm file."""
fh.seek(0)
fh.write(self._header(pam))
data = self.asarray(copy=False)
if self.maxval == 1:
data = numpy.packbits(data, axis=-1)
data.tofile(fh)
def crossbar_rows(self):
"""Returns a list of crossbar rows"""
rows = []
for i in range(self.num_neurons):
rows.append({
'type': 'S%d' % self.axon_type[i],
'synapses': (' '.join(map(lambda x: format(x, '02x'), np.packbits(self.w[i, :])))).upper()
})
return rows
def _to_hash(projected):
if projected.shape[1] % 8 != 0:
raise ValueError('Require reduced dimensionality to be a multiple '
'of 8 for hashing')
# XXX: perhaps non-copying operation better
out = np.packbits((projected > 0).astype(int)).view(dtype=HASH_DTYPE)
return out.reshape(projected.shape[0], -1)
def _generate_masks(self):
"""Creates left and right masks for all hash lengths."""
tri_size = MAX_HASH_SIZE + 1
# Called once on fitting, output is independent of hashes
left_mask = np.tril(np.ones((tri_size, tri_size), dtype=int))[:, 1:]
right_mask = left_mask[::-1, ::-1]
self._left_mask = np.packbits(left_mask).view(dtype=HASH_DTYPE)
self._right_mask = np.packbits(right_mask).view(dtype=HASH_DTYPE)
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=np.bool, copy=True)
self._validate_parameters()
data = [np.packbits(row) for row in data]
biclusters = []
patterns_found = set()
for ri, rj in combinations(data, 2):
pattern = np.bitwise_and(ri, rj)
pattern_cols = sum(popcount(int(n)) for n in pattern)
if pattern_cols >= self.min_cols and self._is_new(patterns_found, pattern):
rows = [k for k, r in enumerate(data) if self._match(pattern, r)]
if len(rows) >= self.min_rows:
cols = np.where(np.unpackbits(pattern) == 1)[0]
biclusters.append(Bicluster(rows, cols))
return Biclustering(biclusters)
def test_packbits():
# Copied from the docstring.
a = [[[1, 0, 1], [0, 1, 0]],
[[1, 1, 0], [0, 0, 1]]]
for dtype in [np.bool, np.uint8, np.int]:
arr = np.array(a, dtype=dtype)
b = np.packbits(arr, axis=-1)
assert_equal(b.dtype, np.uint8)
assert_array_equal(b, np.array([[[160], [64]], [[192], [32]]]))
assert_raises(TypeError, np.packbits, np.array(a, dtype=float))
def main(_):
if (FLAGS.input_image is None or FLAGS.output_codes is None or
FLAGS.model is None):
print('\nUsage: python encoder.py --input_image=/your/image/here.png '
'--output_codes=output_codes.pkl --iteration=15 '
'--model=residual_gru.pb\n\n')
return
if FLAGS.iteration < 0 or FLAGS.iteration > 15:
print('\n--iteration must be between 0 and 15 inclusive.\n')
return
with tf.gfile.FastGFile(FLAGS.input_image) as input_image:
input_image_str = input_image.read()
with tf.Graph().as_default() as graph:
# Load the inference model for encoding.
with tf.gfile.FastGFile(FLAGS.model, 'rb') as model_file:
graph_def = tf.GraphDef()
graph_def.ParseFromString(model_file.read())
_ = tf.import_graph_def(graph_def, name='')
input_tensor = graph.get_tensor_by_name('Placeholder:0')
outputs = [graph.get_tensor_by_name(name) for name in
get_output_tensor_names()]
input_image = tf.placeholder(tf.string)
_, ext = os.path.splitext(FLAGS.input_image)
if ext == '.png':
decoded_image = tf.image.decode_png(input_image, channels=3)
elif ext == '.jpeg' or ext == '.jpg':
decoded_image = tf.image.decode_jpeg(input_image, channels=3)
else:
assert False, 'Unsupported file format {}'.format(ext)
decoded_image = tf.expand_dims(decoded_image, 0)
with tf.Session(graph=graph) as sess:
img_array = sess.run(decoded_image, feed_dict={input_image:
input_image_str})
results = sess.run(outputs, feed_dict={input_tensor: img_array})
results = results[0:FLAGS.iteration + 1]
int_codes = np.asarray([x.astype(np.int8) for x in results])
# Convert int codes to binary.
int_codes = (int_codes + 1)//2
export = np.packbits(int_codes.reshape(-1))
output = io.BytesIO()
np.savez_compressed(output, shape=int_codes.shape, codes=export)
with tf.gfile.FastGFile(FLAGS.output_codes, 'w') as code_file:
code_file.write(output.getvalue())
def finishframe(self, symbols):
# look for flag at end.
flagcorr = numpy.correlate(symbols, [-1, 1, 1, 1, 1, 1, 1, 1, -1])
cimax = numpy.argmax(flagcorr) # index of first flag
cimin = numpy.argmin(flagcorr) # index of first inverted flag
if flagcorr[cimax] == 9 and flagcorr[cimin] == -9:
# they are both proper flags
ci = min(cimax, cimin)
elif flagcorr[cimax] == 9:
ci = cimax
else:
ci = cimin
symbols = symbols[0:ci+1]
# un-nrzi symbols to get bits.
bits = numpy.where(numpy.equal(symbols[:-1], symbols[1:]), 1, 0)
# un-bit-stuff. every sequence of five ones must be followed
# by a zero, which we should delete.
nones = 0
nbits = [ ]
for i in range(0, len(bits)):
if nones == 5:
nones = 0
# assuming bits[i] == 0
# don't append the zero...
elif bits[i] == 1:
nones += 1
nbits.append(bits[i])
else:
nbits.append(bits[i])
nones = 0
bits = nbits
if len(bits) < 8:
return [ 0, None, 0 ]
# convert bits to bytes.
# bits[] is least-significant-first, but
# numpy.packbits() wants MSF.
bits = numpy.array(bits)
bits = bits[0:(len(bits)/8)*8]
assert (len(bits)%8) == 0
bits = bits[::-1]
bytes = numpy.packbits(bits)
bytes = bytes[::-1]
msg = None
ok = self.checkpacket(bytes)
if ok > 0 or len(bytes) > 16:
msg = self.printpacket(bytes)
return [ ok, msg, len(symbols) ]
# 0: syntactically unlikely to be a packet
# 1: syntactically plausible but crc failed
# 2: crc is correct