Python array 模块,array() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用array.array()。
def _transmitMsg(self, msg):
"""Send an OSC message over a streaming socket. Raises exception if it
should fail. If everything is transmitted properly, True is returned. If
socket has been closed, False.
"""
if not isinstance(msg, OSCMessage):
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
try:
binary = msg.getBinary()
length = len(binary)
# prepend length of packet before the actual message (big endian)
len_big_endian = array.array('c', '\0' * 4)
struct.pack_into(">L", len_big_endian, 0, length)
len_big_endian = len_big_endian.tostring()
if self._transmit(len_big_endian) and self._transmit(binary):
return True
return False
except socket.error as e:
if e[0] == errno.EPIPE: # broken pipe
return False
raise e
def _transmitMsg(self, msg):
"""Send an OSC message over a streaming socket. Raises exception if it
should fail. If everything is transmitted properly, True is returned. If
socket has been closed, False.
"""
if not isinstance(msg, OSCMessage):
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
try:
binary = msg.getBinary()
length = len(binary)
# prepend length of packet before the actual message (big endian)
len_big_endian = array.array('c', '\0' * 4)
struct.pack_into(">L", len_big_endian, 0, length)
len_big_endian = len_big_endian.tostring()
if self._transmit(len_big_endian) and self._transmit(binary):
return True
return False
except socket.error, e:
if e[0] == errno.EPIPE: # broken pipe
return False
raise e
def __init__(self, porttype):
"""
Instantiates a new object and generates a default StreamSRI. The
porttype parameter corresponds to the type of data contained in the
array of data being sent.
The porttype is also used in the connectPort() method to narrow the
connection
"""
self.port_type = porttype
self.outPorts = {}
self.refreshSRI = False
self.sri=bulkio_helpers.defaultSRI
self.port_lock = threading.Lock()
self.done = False
def __init__(self, porttype):
"""
Instantiates a new object responsible for writing data from the port
into an array.
It is important to notice that the porttype is a BULKIO__POA type and
not a BULKIO type. The reason is because it is used to generate a
Port class that will be returned when the getPort() is invoked. The
returned class is the one acting as a server and therefore must be a
Portable Object Adapter rather and a simple BULKIO object.
Inputs:
<porttype> The BULKIO__POA data type
"""
StreamMgr.__init__(self)
self.port_type = porttype
self.sri=bulkio_helpers.defaultSRI
self.data = []
self.timestamps = []
self.gotEOS = False
self.breakBlock = False
self.port_lock = threading.Lock()
self.port_cond = threading.Condition(self.port_lock)
def pushPacket(self, data, ts, EOS, stream_id):
"""
Appends the data to the end of the array.
Input:
<data> The actual data to append to the array
<ts> The timestamp
<EOS> Flag indicating if this is the End Of the Stream
<stream_id> The unique stream id
"""
if not self._livingStreams.has_key(stream_id):
self._livingStreams[stream_id] = InputStream(self, BULKIO.StreamSRI(1, 0.0, 1.0, 1, 0, 0.0, 0.0, 0, 0, stream_id, False, []))
_stream = self._livingStreams[stream_id]
_stream._updateData(data, ts, EOS)
if EOS:
self._streams.append(_stream)
self._livingStreams.pop(stream_id)
# legacy stuff
self.data += data
def __init__(self, porttype):
"""
Instantiates a new object responsible for writing data from the port
into an array.
It is important to notice that the porttype is a BULKIO__POA type and
not a BULKIO type. The reason is because it is used to generate a
Port class that will be returned when the getPort() is invoked. The
returned class is the one acting as a server and therefore must be a
Portable Object Adapter rather and a simple BULKIO object.
Inputs:
<porttype> The BULKIO__POA data type
"""
self.port_type = porttype
self.sri=bulkio_helpers.defaultSRI
self.data = []
self.gotEOS = False
self.port_lock = threading.Lock()
self.valid_streams = {}
self.invalid_streams = {}
self.received_data = {}
def pushPacket(self, data, ts, EOS, stream_id):
"""
Appends the data to the end of the array.
Input:
<data> The actual data to append to the array
<ts> The timestamp
<EOS> Flag indicating if this is the End Of the Stream
<stream_id> The unique stream id
"""
self.port_lock.acquire()
try:
if not self.valid_streams.has_key(stream_id):
log.warn("the received packet has the invalid stream ID: "+stream_id+". Valid stream IDs are:"+str(self.valid_streams.keys()))
self.received_data[stream_id] = (self.received_data[stream_id][0] + len(data), self.received_data[stream_id][1] + 1)
if EOS:
self.invalid_streams[stream_id] = self.valid_streams[stream_id]
del self.valid_streams[stream_id]
self.gotEOS = True
else:
self.gotEOS = False
finally:
self.port_lock.release()
def pushPacket(self, data, EOS, streamID):
"""
Overrides ArraySource.pushPacket(). The difference is that there is no
<ts> timestamp or sri
Appends the data to the end of the array.
Input:
<data> The actual data to append to the array
<EOS> Flag indicating if this is the End Of the Stream
<stream_id> The unique stream id
"""
self.port_lock.acquire()
try:
try:
for connId, port in self.outPorts.items():
if port != None: port.pushPacket(data, EOS, streamID)
except Exception, e:
msg = "The call to pushPacket failed with %s " % e
msg += "connection %s instance %s" % (connId, port)
log.warn(msg)
finally:
self.port_lock.release()
def correct_umi(seq, counts):
corrected_seq = seq
count = counts.get(seq, 0)
a = array.array('c', seq)
for pos in xrange(len(a)):
existing = a[pos]
for c in tk_seq.NUCS:
if c == existing:
continue
a[pos] = c
test_str = a.tostring()
value = counts.get(test_str, 0)
if value > count or (value == count and corrected_seq < test_str):
corrected_seq = test_str
count = value
a[pos] = existing
return corrected_seq
def send(self, data):
"""Send `data' to the server."""
if self.sock is None:
if self.auto_open:
self.connect()
else:
raise NotConnected()
if self.debuglevel > 0:
print "send:", repr(data)
blocksize = 8192
if hasattr(data,'read') and not isinstance(data, array):
if self.debuglevel > 0: print "sendIng a read()able"
datablock = data.read(blocksize)
while datablock:
self.sock.sendall(datablock)
datablock = data.read(blocksize)
else:
self.sock.sendall(data)
def read(self, n=None):
"""Read and return up to n bytes.
If the argument is omitted, None, or negative, reads and
returns all data until EOF.
If the argument is positive, and the underlying raw stream is
not 'interactive', multiple raw reads may be issued to satisfy
the byte count (unless EOF is reached first). But for
interactive raw streams (XXX and for pipes?), at most one raw
read will be issued, and a short result does not imply that
EOF is imminent.
Returns an empty bytes array on EOF.
Raises BlockingIOError if the underlying raw stream has no
data at the moment.
"""
self._unsupported("read")
def readinto(self, b):
"""Read up to len(b) bytes into b.
Like read(), this may issue multiple reads to the underlying raw
stream, unless the latter is 'interactive'.
Returns the number of bytes read (0 for EOF).
Raises BlockingIOError if the underlying raw stream has no
data at the moment.
"""
# XXX This ought to work with anything that supports the buffer API
data = self.read(len(b))
n = len(data)
try:
b[:n] = data
except TypeError as err:
import array
if not isinstance(b, array.array):
raise err
b[:n] = array.array(b'b', data)
return n
def test_keymap_new_from_buffer(self):
ctx = xkb.Context()
if six.PY2:
typecode = b'b'
else:
typecode = 'b'
test_data = array.array(typecode, sample_keymap_bytes)
km = ctx.keymap_new_from_buffer(test_data)
self.assertIsNotNone(km)
self.assertEqual(km.load_method, "buffer")
test_data.extend([0] * 10)
length = len(sample_keymap_bytes)
km = ctx.keymap_new_from_buffer(test_data, length=length)
self.assertIsNotNone(km)
# This class makes use of the details of the sample keymap in
# sample_keymap_string.
def DQT(self, marker):
#
# Define quantization table. Support baseline 8-bit tables
# only. Note that there might be more than one table in
# each marker.
# FIXME: The quantization tables can be used to estimate the
# compression quality.
n = i16(self.fp.read(2))-2
s = ImageFile._safe_read(self.fp, n)
while len(s):
if len(s) < 65:
raise SyntaxError("bad quantization table marker")
v = i8(s[0])
if v//16 == 0:
self.quantization[v & 15] = array.array("b", s[1:65])
s = s[65:]
else:
return # FIXME: add code to read 16-bit tables!
# raise SyntaxError, "bad quantization table element size"
#
# JPEG marker table
def dumpsect(self, sector, firstindex=0):
"Displays a sector in a human-readable form, for debugging purpose."
if not DEBUG_MODE:
return
VPL = 8 # number of values per line (8+1 * 8+1 = 81)
tab = array.array(UINT32, sector)
if sys.byteorder == 'big':
tab.byteswap()
nbsect = len(tab)
nlines = (nbsect+VPL-1)//VPL
print("index", end=" ")
for i in range(VPL):
print("%8X" % i, end=" ")
print()
for l in range(nlines):
index = l*VPL
print("%8X:" % (firstindex+index), end=" ")
for i in range(index, index+VPL):
if i >= nbsect:
break
sect = tab[i]
name = "%8X" % sect
print(name, end=" ")
print()
def decompile(self, data, ttFont):
longFormat = ttFont['head'].indexToLocFormat
if longFormat:
format = "I"
else:
format = "H"
locations = array.array(format)
locations.fromstring(data)
if sys.byteorder != "big":
locations.byteswap()
if not longFormat:
l = array.array("I")
for i in range(len(locations)):
l.append(locations[i] * 2)
locations = l
if len(locations) < (ttFont['maxp'].numGlyphs + 1):
log.warning("corrupt 'loca' table, or wrong numGlyphs in 'maxp': %d %d",
len(locations) - 1, ttFont['maxp'].numGlyphs)
self.locations = locations
def compile(self, ttFont):
try:
max_location = max(self.locations)
except AttributeError:
self.set([])
max_location = 0
if max_location < 0x20000 and all(l % 2 == 0 for l in self.locations):
locations = array.array("H")
for i in range(len(self.locations)):
locations.append(self.locations[i] // 2)
ttFont['head'].indexToLocFormat = 0
else:
locations = array.array("I", self.locations)
ttFont['head'].indexToLocFormat = 1
if sys.byteorder != "big":
locations.byteswap()
return locations.tostring()
def decompileDeltas_(numDeltas, data, offset):
"""(numDeltas, data, offset) --> ([delta, delta, ...], newOffset)"""
result = []
pos = offset
while len(result) < numDeltas:
runHeader = byteord(data[pos])
pos += 1
numDeltasInRun = (runHeader & DELTA_RUN_COUNT_MASK) + 1
if (runHeader & DELTAS_ARE_ZERO) != 0:
result.extend([0] * numDeltasInRun)
else:
if (runHeader & DELTAS_ARE_WORDS) != 0:
deltas = array.array("h")
deltasSize = numDeltasInRun * 2
else:
deltas = array.array("b")
deltasSize = numDeltasInRun
deltas.fromstring(data[pos:pos+deltasSize])
if sys.byteorder != "big":
deltas.byteswap()
assert len(deltas) == numDeltasInRun
pos += deltasSize
result.extend(deltas)
assert len(result) == numDeltas
return (result, pos)
def decompile(self, data, ttFont):
# we usually get here indirectly from the subtable __getattr__ function, in which case both args must be None.
# If not, someone is calling the subtable decompile() directly, and must provide both args.
if data is not None and ttFont is not None:
self.decompileHeader(data, ttFont)
else:
assert (data is None and ttFont is None), "Need both data and ttFont arguments"
data = self.data # decompileHeader assigns the data after the header to self.data
firstCode, entryCount = struct.unpack(">HH", data[:4])
firstCode = int(firstCode)
data = data[4:]
#assert len(data) == 2 * entryCount # XXX not true in Apple's Helvetica!!!
gids = array.array("H")
gids.fromstring(data[:2 * int(entryCount)])
if sys.byteorder != "big":
gids.byteswap()
self.data = data = None
charCodes = list(range(firstCode, firstCode + len(gids)))
self.cmap = _make_map(self.ttFont, charCodes, gids)
def decode_format_4_0(self, data, ttFont):
from fontTools import agl
numGlyphs = ttFont['maxp'].numGlyphs
indices = array.array("H")
indices.fromstring(data)
if sys.byteorder != "big":
indices.byteswap()
# In some older fonts, the size of the post table doesn't match
# the number of glyphs. Sometimes it's bigger, sometimes smaller.
self.glyphOrder = glyphOrder = [''] * int(numGlyphs)
for i in range(min(len(indices),numGlyphs)):
if indices[i] == 0xFFFF:
self.glyphOrder[i] = ''
elif indices[i] in agl.UV2AGL:
self.glyphOrder[i] = agl.UV2AGL[indices[i]]
else:
self.glyphOrder[i] = "uni%04X" % indices[i]
self.build_psNameMapping(ttFont)
def encode_format_4_0(self, ttFont):
from fontTools import agl
numGlyphs = ttFont['maxp'].numGlyphs
glyphOrder = ttFont.getGlyphOrder()
assert len(glyphOrder) == numGlyphs
indices = array.array("H")
for glyphID in glyphOrder:
glyphID = glyphID.split('#')[0]
if glyphID in agl.AGL2UV:
indices.append(agl.AGL2UV[glyphID])
elif len(glyphID) == 7 and glyphID[:3] == 'uni':
indices.append(int(glyphID[3:],16))
else:
indices.append(0xFFFF)
if sys.byteorder != "big":
indices.byteswap()
return indices.tostring()
def decompileOffsets_(data, tableFormat, glyphCount):
if tableFormat == 0:
# Short format: array of UInt16
offsets = array.array("H")
offsetsSize = (glyphCount + 1) * 2
else:
# Long format: array of UInt32
offsets = array.array("I")
offsetsSize = (glyphCount + 1) * 4
offsets.fromstring(data[0 : offsetsSize])
if sys.byteorder != "big":
offsets.byteswap()
# In the short format, offsets need to be multiplied by 2.
# This is not documented in Apple's TrueType specification,
# but can be inferred from the FreeType implementation, and
# we could verify it with two sample GX fonts.
if tableFormat == 0:
offsets = [off * 2 for off in offsets]
return offsets
def compileOffsets_(offsets):
"""Packs a list of offsets into a 'gvar' offset table.
Returns a pair (bytestring, tableFormat). Bytestring is the
packed offset table. Format indicates whether the table
uses short (tableFormat=0) or long (tableFormat=1) integers.
The returned tableFormat should get packed into the flags field
of the 'gvar' header.
"""
assert len(offsets) >= 2
for i in range(1, len(offsets)):
assert offsets[i - 1] <= offsets[i]
if max(offsets) <= 0xffff * 2:
packed = array.array("H", [n >> 1 for n in offsets])
tableFormat = 0
else:
packed = array.array("I", offsets)
tableFormat = 1
if sys.byteorder != "big":
packed.byteswap()
return (packed.tostring(), tableFormat)
def compileCoordinates(self):
assert len(self.coordinates) == len(self.flags)
data = []
endPtsOfContours = array.array("h", self.endPtsOfContours)
if sys.byteorder != "big":
endPtsOfContours.byteswap()
data.append(endPtsOfContours.tostring())
instructions = self.program.getBytecode()
data.append(struct.pack(">h", len(instructions)))
data.append(instructions)
deltas = self.coordinates.copy()
if deltas.isFloat():
# Warn?
deltas.toInt()
deltas.absoluteToRelative()
# TODO(behdad): Add a configuration option for this?
deltas = self.compileDeltasGreedy(self.flags, deltas)
#deltas = self.compileDeltasOptimal(self.flags, deltas)
data.extend(deltas)
return bytesjoin(data)
def addConverters(table):
for i in range(len(table)):
op, name, arg, default, conv = table[i]
if conv is not None:
continue
if arg in ("delta", "array"):
conv = ArrayConverter()
elif arg == "number":
conv = NumberConverter()
elif arg == "SID":
conv = ASCIIConverter()
elif arg == 'blendList':
conv = None
else:
assert False
table[i] = op, name, arg, default, conv
def crc_update(crc, data):
"""Update CRC-32C checksum with data.
Args:
crc: 32-bit checksum to update as long.
data: byte array, string or iterable over bytes.
Returns:
32-bit updated CRC-32C as long.
"""
if type(data) != array.array or data.itemsize != 1:
buf = array.array("B", data)
else:
buf = data
crc ^= _MASK
for b in buf:
table_index = (crc ^ b) & 0xff
crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK
return crc ^ _MASK
def _readTxt(fname):
'''Returns array of words and word embedding matrix
'''
words, vectors = [], []
hook = codecs.open(fname, 'r', 'utf-8')
# get summary info about vectors file
(numWords, dim) = (int(s.strip()) for s in hook.readline().split())
for line in hook:
chunks = line.split()
word, vector = chunks[0].strip(), np.array([float(n) for n in chunks[1:]])
words.append(word)
vectors.append(vector)
hook.close()
assert len(words) == numWords
for v in vectors: assert len(v) == dim
return (words, vectors)
def DQT(self, marker):
#
# Define quantization table. Support baseline 8-bit tables
# only. Note that there might be more than one table in
# each marker.
# FIXME: The quantization tables can be used to estimate the
# compression quality.
n = i16(self.fp.read(2))-2
s = ImageFile._safe_read(self.fp, n)
while len(s):
if len(s) < 65:
raise SyntaxError("bad quantization table marker")
v = i8(s[0])
if v//16 == 0:
self.quantization[v & 15] = array.array("B", s[1:65])
s = s[65:]
else:
return # FIXME: add code to read 16-bit tables!
# raise SyntaxError, "bad quantization table element size"
#
# JPEG marker table
def allocDelayProg(size, duration):
"""Python program as str that allocates object of the specified size
in Gigabytes and then waits for the specified time
size - allocation size, bytes; None to skip allocation
duration - execution duration, sec; None to skip the sleep
return - Python program as str
"""
return """from __future__ import print_function, division # Required for stderr output, must be the first import
import sys
import time
import array
if {size} is not None:
a = array.array('b', [0])
asize = sys.getsizeof(a)
# Note: allocate at least empty size, i.e. empty list
buffer = a * int(max({size} - asize + 1, 0))
#if _DEBUG_TRACE:
# print(''.join((' allocDelayProg(), allocated ', str(sys.getsizeof(buffer))
# , ' bytes for ', str({duration}),' sec')), file=sys.stderr)
if {duration} is not None:
time.sleep({duration})
""".format(size=size, duration=duration)
def Encrypt(self, plain):
if self.Debug:
print "\n\n",self.name," Encrypting :",plain
self.ShowInitVars("ENCRYPT")
data = array('B', plain)
for cycle in range(self.cycles):
params = ("Encrypt", cycle)
data = self.Cycle(self.SetDataVector(data, params), params)
ret = data.tostring()
if self.Debug:
print "Cipher: ",ret.encode('hex')
self.SeedData = self.MShaHex(plain)
self.HMAC = self.GenHMAC(ret)
self._Salt()
return ret
def Decrypt(self, data):
if self.Debug:
print "\n\n",self.name," Decrypting :",self.Encode(data)
self.ShowInitVars("DECRYPT")
data = array('B', data)
for cycle in range(self.cycles):
params = ("Decrypt", cycle)
data = self.SetDataVector(self.Cycle(data, params), params)
ret = data.tostring()
if self.Debug:
print "Plain: ",ret
self.SeedData = self.MShaHex(ret)
self._Salt()
return ret
def havoc_perform_clone_random_byte(data, func):
if (len(data) + AFL_HAVOC_BLK_LARGE) < KAFL_MAX_FILE:
clone_length = AFL_choose_block_len(len(data))
clone_from = RAND(len(data) - clone_length + 1)
clone_to = RAND(len(data))
head = data[0:clone_to].tostring()
if RAND(4) != 0:
body = data[clone_from: clone_from+clone_length].tostring()
else:
body = ''.join(chr(random.randint(0, 0xff)) for _ in range(clone_length))
tail = data[clone_to:(len(data) - clone_to)].tostring()
data = array('B', head + body + tail)
func(data.tostring())
return data
def havoc_splicing(data, files=None):
if len(data) >= 2:
for file in files:
file_data = read_binary_file(file)
if len(file_data) < 2:
continue
first_diff, last_diff = find_diffs(data, file_data)
if last_diff < 2 or first_diff == last_diff:
continue
split_location = first_diff + RAND(last_diff - first_diff)
data = array('B', data.tostring()[:split_location] + file_data[split_location:len(data)])
#func(data.tostring())
break
return data
def bitflip_range(self, use_effector_map=False, skip_zero=False):
for i in range(0, self.MAX_ITERATIONS):
if use_effector_map:
eff_map = effector_map=self.generate_effector_map(self.MAX_ITERATIONS)
else:
eff_map = None
data = self.generate_input(i)
value1 = bitflip_range(data, effector_map=eff_map, skip_null=skip_zero)
self.counter = 0
mutate_seq_walking_bits_array(array('B', data), self.func, effector_map=eff_map, skip_null=skip_zero)
mutate_seq_two_walking_bits_array(array('B', data), self.func, effector_map=eff_map, skip_null=skip_zero)
mutate_seq_four_walking_bits_array(array('B', data), self.func, effector_map=eff_map, skip_null=skip_zero)
mutate_seq_walking_byte_array(array('B', data), self.func, effector_map=eff_map, skip_null=skip_zero)
mutate_seq_two_walking_bytes_array(array('B', data), self.func, effector_map=eff_map)
mutate_seq_four_walking_bytes_array(array('B', data), self.func, effector_map=eff_map)
self.assertEqual(value1, self.counter)
def test_simple_arithmetic_input_generation(self):
for i in range(self.MAX_ITERATIONS):
self.TEST_INPUT = ''.join(random.choice(string.printable + '\x00') for i in range(i))
self.TEST_MUTATIONS = []
self.TEST_MUTATION_CHECK = []
self.generate_test_mutations_arithmetic()
self.func_check(self.TEST_INPUT)
mutate_seq_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_two_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_four_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_walking_byte_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_two_walking_bytes_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None)
mutate_seq_four_walking_bytes_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None)
mutate_seq_8_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None, set_arith_max=127)
mutate_seq_16_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None, set_arith_max=127)
mutate_seq_32_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None, set_arith_max=127)
for i in range(len(self.TEST_MUTATION_CHECK)):
self.assertTrue(self.TEST_MUTATION_CHECK[i])
def test_seq_arithmetic_input_generation(self):
for i in range(1, self.MAX_ITERATIONS+1):
self.TEST_INPUT = ''.join(random.choice(string.printable + '\x00') for i in range(i))
self.TEST_MUTATIONS = []
self.TEST_MUTATION_CHECK = []
self.generate_test_mutations_seq_arithmetic(i%127)
self.func_check(self.TEST_INPUT)
mutate_seq_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_two_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_four_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_walking_byte_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_two_walking_bytes_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None)
mutate_seq_four_walking_bytes_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None)
mutate_seq_8_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None, set_arith_max=(i+1)%127)
mutate_seq_16_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None, set_arith_max=(i+1)%127)
mutate_seq_32_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None, set_arith_max=(i+1)%127)
for i in range(len(self.TEST_MUTATION_CHECK)):
if(not self.TEST_MUTATION_CHECK[i]):
print(str(array('B', self.TEST_INPUT)) + " - " + str(array('B', self.TEST_MUTATIONS[i])))
self.assertTrue(self.TEST_MUTATION_CHECK[i])
def test_simple_interesting32_input_generation(self):
for i in range(self.MAX_ITERATIONS):
self.TEST_INPUT = ''.join(random.choice(string.printable + '\x00') for i in range(i))
self.TEST_MUTATIONS = []
self.TEST_MUTATION_CHECK = []
self.generate_test_mutations_seq_interesting32()
self.func_check(self.TEST_INPUT)
mutate_seq_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_two_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_four_walking_bits_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_walking_byte_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_two_walking_bytes_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None)
mutate_seq_four_walking_bytes_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None)
mutate_seq_8_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_16_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_32_bit_arithmetic_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_8_bit_interesting_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_16_bit_interesting_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
mutate_seq_32_bit_interesting_array(array('B', self.TEST_INPUT), self.func_check, effector_map=None, skip_null=None)
for i in range(len(self.TEST_MUTATION_CHECK)):
self.assertTrue(self.TEST_MUTATION_CHECK[i])
def encrypt_block(block, roundkeys):
"""Encrypts a single block. This is the main AES function"""
# use array instead of bytes
roundkeys = [array('B', roundkey) for roundkey in roundkeys]
# For efficiency reasons, the state between steps is transmitted via a
# mutable array, not returned.
add_round_key(block, roundkeys[0])
for round in range(1, 10):
sub_bytes(block)
shift_rows(block)
mix_columns(block)
add_round_key(block, roundkeys[round])
sub_bytes(block)
shift_rows(block)
# no mix_columns step in the last round
add_round_key(block, roundkeys[10])
def load_glove_vectors(filename, vocab):
"""
Load glove vectors from a .txt file.
Optionally limit the vocabulary to save memory. `vocab` should be a set.
"""
dct = {}
vectors = array.array('d')
current_idx = 0
with open(filename, "r", encoding="utf-8") as f:
for _, line in enumerate(f):
tokens = line.split(" ")
word = tokens[0]
entries = tokens[1:]
if not vocab or word in vocab:
dct[word] = current_idx
vectors.extend(float(x) for x in entries)
current_idx += 1
word_dim = len(entries)
num_vectors = len(dct)
tf.logging.info("Found {} out of {} vectors in Glove".format(num_vectors, len(vocab)))
return [np.array(vectors).reshape(num_vectors, word_dim), dct]
def insert_line(self, n=1):
"""
Inserts *n* lines at the current cursor position.
"""
#logging.debug("insert_line(%s)" % n)
if not n: # Takes care of an empty string
n = 1
n = int(n)
for i in xrange(n):
self.screen.pop(self.bottom_margin) # Remove the bottom line
# Remove bottom line's style information as well:
self.renditions.pop(self.bottom_margin)
empty_line = array('u', u' ' * self.cols) # Line full of spaces
self.screen.insert(self.cursorY, empty_line) # Insert at cursor
# Insert a new empty rendition as well:
empty_rend = array('u', unichr(1000) * self.cols)
self.renditions.insert(self.cursorY, empty_rend) # Insert at cursor
def delete_line(self, n=1):
"""
Deletes *n* lines at the current cursor position.
"""
#logging.debug("delete_line(%s)" % n)
if not n: # Takes care of an empty string
n = 1
n = int(n)
for i in xrange(n):
self.screen.pop(self.cursorY) # Remove the line at the cursor
# Remove the line's style information as well:
self.renditions.pop(self.cursorY)
# Now add an empty line and empty set of renditions to the bottom of
# the view
empty_line = array('u', u' ' * self.cols) # Line full of spaces
# Add it to the bottom of the view:
self.screen.insert(self.bottom_margin, empty_line) # Insert at bottom
# Insert a new empty rendition as well:
empty_rend = array('u', unichr(1000) * self.cols)
self.renditions.insert(self.bottom_margin, empty_rend)
def clear_screen_from_cursor_down(self):
"""
Clears the screen from the cursor down (ESC[J or ESC[0J).
.. note:: This method actually erases from the cursor position to the end of the screen.
"""
#logging.debug('clear_screen_from_cursor_down()')
self.clear_line_from_cursor_right()
if self.cursorY == self.rows - 1:
# Bottom of screen; nothing to do
return
self.screen[self.cursorY+1:] = [
array('u', u' ' * self.cols) for a in self.screen[self.cursorY+1:]
]
c = self.cur_rendition # Just to save space below
self.renditions[self.cursorY+1:] = [
array('u', c * self.cols) for a in self.renditions[self.cursorY+1:]
]
def _websocket_mask_python(mask, data):
"""Websocket masking function.
`mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length.
Returns a `bytes` object of the same length as `data` with the mask applied
as specified in section 5.3 of RFC 6455.
This pure-python implementation may be replaced by an optimized version when available.
"""
mask = array.array("B", mask)
unmasked = array.array("B", data)
for i in xrange(len(data)):
unmasked[i] = unmasked[i] ^ mask[i % 4]
if hasattr(unmasked, 'tobytes'):
# tostring was deprecated in py32. It hasn't been removed,
# but since we turn on deprecation warnings in our tests
# we need to use the right one.
return unmasked.tobytes()
else:
return unmasked.tostring()
def _websocket_mask_python(mask, data):
"""Websocket masking function.
`mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length.
Returns a `bytes` object of the same length as `data` with the mask applied
as specified in section 5.3 of RFC 6455.
This pure-python implementation may be replaced by an optimized version when available.
"""
mask = array.array("B", mask)
unmasked = array.array("B", data)
for i in xrange(len(data)):
unmasked[i] = unmasked[i] ^ mask[i % 4]
if hasattr(unmasked, 'tobytes'):
# tostring was deprecated in py32. It hasn't been removed,
# but since we turn on deprecation warnings in our tests
# we need to use the right one.
return unmasked.tobytes()
else:
return unmasked.tostring()
def _websocket_mask_python(mask, data):
"""Websocket masking function.
`mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length.
Returns a `bytes` object of the same length as `data` with the mask applied
as specified in section 5.3 of RFC 6455.
This pure-python implementation may be replaced by an optimized version when available.
"""
mask = array.array("B", mask)
unmasked = array.array("B", data)
for i in xrange(len(data)):
unmasked[i] = unmasked[i] ^ mask[i % 4]
if hasattr(unmasked, 'tobytes'):
# tostring was deprecated in py32. It hasn't been removed,
# but since we turn on deprecation warnings in our tests
# we need to use the right one.
return unmasked.tobytes()
else:
return unmasked.tostring()
def langids(self):
""" Return the USB device's supported language ID codes.
These are 16-bit codes familiar to Windows developers, where for
example instead of en-US you say 0x0409. USB_LANGIDS.pdf on the usb.org
developer site for more info. String requests using a LANGID not
in this array should not be sent to the device.
This property will cause some USB traffic the first time it is accessed
and cache the resulting value for future use.
"""
if self._langids is None:
try:
self._langids = util.get_langids(self)
except USBError:
self._langids = ()
return self._langids
def create(cls, tag: int, item_type: ItemType = None, data: _array = [], long: bool = False):
if cls._item_map is None:
cls._item_map = {}
cls._map_subclasses(cls)
if long:
raise NotImplementedError("Log items are not supported by this parser yet")
if item_type is not None:
if (tag & ~0x0F) > 0:
raise ValueError("tag is not valid")
tag = (tag << 4) | item_type.value
if tag not in cls._item_map:
raise ValueError("Unknown tag {0} ({1})".format(tag, hex(tag)))
return cls._item_map[tag](data=data, long=long)
def readData(self, filename, file_index):
# Open file and read file size
self.warning("Load input file: %s" % filename)
data = open(filename, 'rb')
orig_filesize = fstat(data.fileno())[ST_SIZE]
if not orig_filesize:
raise ValueError("Input file (%s) is empty!" % filename)
# Read bytes
if self.max_size:
data = data.read(self.max_size)
else:
data = data.read()
# Display message if input is truncated
if len(data) < orig_filesize:
percent = len(data) * 100.0 / orig_filesize
self.warning("Truncate file to %s bytes (%.2f%% of %s bytes)" \
% (len(data), percent, orig_filesize))
# Convert to Python array object
return array('B', data)
def createData(self):
# Cached result?
if self.data:
return array('B', self.data)
# Get previous complete data
previous = self
while True:
previous = previous.getPrevious()
if previous.data:
break
# Apply new operations (since previous version)
data = array('B', previous.data)
for operation in self.operations[len(previous.operations):]:
operation(data)
# Cache result
self.data = data.tostring()
return data
def mangleData(self, data, index):
self.sha1_offset = 208
self.md5_offset = 256
self.header_offset = 360
self.filedata_offset = 3170
data = MangleFile.mangleData(self, data, index)
if USE_HACHOIR:
#data.tofile(open('/tmp/oops', 'wb'))
hachoir_config.quiet = True
data_str = data.tostring()
parser = guessParser(StringInputStream(data_str))
if parser:
self.useHachoirParser(parser)
summary_data = data[self.header_offset:].tostring()
checksum = md5(summary_data).digest()
data[self.md5_offset:self.md5_offset+16] = array('B', checksum)
summary_data = data[self.header_offset:self.filedata_offset].tostring()
checksum = sha(summary_data).hexdigest()
data[self.sha1_offset:self.sha1_offset+40] = array('B', checksum)
return data