Python random 模块,getrandbits() 实例源码
我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用random.getrandbits()。
def test_uint_multi_port(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_ports = random.sample(
[d for d in x_series_device.do_ports if d.do_port_width <= 16], 2)
total_port_width = sum([d.do_port_width for d in do_ports])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
flatten_channel_string([d.name for d in do_ports]),
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(total_port_width))
for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
def test_one_sample_one_line(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_line = random.choice(x_series_device.do_lines).name
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_line, line_grouping=LineGrouping.CHAN_PER_LINE)
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_one_line(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_one_line())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_byte(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[d for d in x_series_device.do_ports if d.do_port_width <= 8])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_byte(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_byte())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint16(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[do for do in x_series_device.do_ports if do.do_port_width <= 16])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_uint16(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_uint16())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint32(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[do for do in x_series_device.do_ports if do.do_port_width <= 32])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_uint32(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_uint32())
numpy.testing.assert_array_equal(values_read, values_to_test)
def random_context():
result = {}
if random.getrandbits(1):
result["function_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["function_version"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["invoked_function_arn"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["memory_limit_in_mb"] = str(random.randint(100, 200))
if random.getrandbits(1):
result["aws_request_id"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["log_group_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["log_stream_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["identity"] = RunLambdaCliTest.random_identity()
if random.getrandbits(1):
result["client_context"] = RunLambdaCliTest.random_client_context()
return result
def __init__(self, *args, **kwds):
## Create shared memory for rendered image
#pg.dbg(namespace={'r': self})
if sys.platform.startswith('win'):
self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
else:
self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
fd = self.shmFile.fileno()
self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
atexit.register(self.close)
GraphicsView.__init__(self, *args, **kwds)
self.scene().changed.connect(self.update)
self.img = None
self.renderTimer = QtCore.QTimer()
self.renderTimer.timeout.connect(self.renderView)
self.renderTimer.start(16)
def __init__(self, *args, **kwds):
## Create shared memory for rendered image
#pg.dbg(namespace={'r': self})
if sys.platform.startswith('win'):
self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
else:
self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
fd = self.shmFile.fileno()
self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
atexit.register(self.close)
GraphicsView.__init__(self, *args, **kwds)
self.scene().changed.connect(self.update)
self.img = None
self.renderTimer = QtCore.QTimer()
self.renderTimer.timeout.connect(self.renderView)
self.renderTimer.start(16)
def main():
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def gen_pseudo_word(self, L=None):
if not L:
L = random.randint(1, 8)
# generate one word that we hadn't used before
while True:
if self.readable:
# alternating between vowels and consonants, sampled with repl.
_choice, _range = random.choice, range(int(math.ceil(L / 2)))
v = [_choice(self.V) for i in _range]
c = [_choice(self.C) for i in _range]
zipped = zip(v, c) if random.getrandbits(1) else zip(c, v)
pseudo_word = ''.join([a for b in zipped for a in b])[:L]
else:
pseudo_word = ''.join(random.sample(
string.ascii_lowercase, L))
if pseudo_word not in self.inv_word_mapping:
return pseudo_word
def dated_path(obj, file_data):
try:
prefix = getattr(obj, 'model_name')
except BaseException:
prefix = "undefined"
parts = op.splitext(file_data.filename)
rand = random.getrandbits(16)
filename = u"{name}_{rand}{ext}".format(
rand=rand, name=parts[0], ext=parts[1]
)
filename = secure_filename(filename)
today = date.today()
path = u"{prefix}/{t.year}/{t.month}/{filename}".format(
prefix=prefix, t=today, filename=filename
)
return path
def action_clone_item(self, ids):
if len(ids) > 1:
flash(
"You can select only one item for this action",
'error'
)
return
model = current_app.db.get_with_content(_id=ids[0])
clone = deepcopy(model)
del clone['_id']
clone['slug'] = f'{clone["slug"]}-{random.getrandbits(32)}'
clone['_isclone'] = True
self._on_model_change(None, clone, True)
self.coll.insert(clone)
self.after_model_change(None, clone, True)
return redirect(url_for('.edit_view', id=clone['_id']))
# TODO: Serialize and activate thia action
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def main():
random.seed()
# ????
print(random.getrandbits(3))
print(random.randint(200, 800))
print(2, 400, 2)
# ????
seq = range(1, 10)
print(random.choice(seq))
print(random.sample(seq, 4))
a = list(seq)
random.shuffle(a)
print(a)
# ??
print(random.random()) # [0.0, 1.0)???????
print(random.uniform(2.1, 4.99)) # ??????????
pass
def __init__(self, length=0, defined=False, value=None):
""" Creates a bit string of a certain length, using a certain underlying
implementation. """
self.length = length
self.fitness = 0
self.isCacheValid = False
self.defined = bitarray(length)
self.data = bitarray(length)
if defined:
self.defined = None
else:
self.defined.setall(False)
if value is not None:
self.data = [bool(X) for X in value]
for i in range(1, length+1):
if bool(random.getrandbits(1)):
self.flip(i)
def test_wrong_method(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'POST ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_bad_connection(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Zoinks\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_bad_upgrade(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebPocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_missing_version(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_accept_wrong_subprotocol(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'Sec-WebSocket-Protocol: one, two\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionRequested)
assert event.proposed_subprotocols == ['one', 'two']
with pytest.raises(ValueError):
ws.accept(event, 'three')
def generate(
cls,
user_id=lambda: random.getrandbits(16),
contents=lambda: random_alphanumeric_string(length=8192),
expiry_time=lambda: int(time.time() + random.getrandbits(16)),
title=lambda: random_alphanumeric_string(),
language=lambda: random.choice(['python', 'css', 'javascript', 'text']),
password=lambda: random_alphanumeric_string(),
is_api_post=lambda: False,
):
return database.paste.create_new_paste(
contents=cls.random_or_specified_value(contents),
user_id=cls.random_or_specified_value(user_id),
expiry_time=cls.random_or_specified_value(expiry_time),
title=cls.random_or_specified_value(title),
language=cls.random_or_specified_value(language),
password=cls.random_or_specified_value(password),
is_api_post=cls.random_or_specified_value(is_api_post),
)
def generate(
cls,
paste_id=lambda: random.getrandbits(16),
file_name=lambda: random_alphanumeric_string(),
file_size=lambda: random.getrandbits(16),
mime_type=lambda: 'image/png',
file_data=lambda: random_alphanumeric_string(8192)
):
with mock.patch.object(database.attachment, '_store_attachment_file'):
return database.attachment.create_new_attachment(
paste_id=cls.random_or_specified_value(paste_id),
file_name=cls.random_or_specified_value(file_name),
file_size=cls.random_or_specified_value(file_size),
mime_type=cls.random_or_specified_value(mime_type),
file_data=cls.random_or_specified_value(file_data),
)
def random_marker():
""" A random marker used to identify a pytest run.
Some tests will spawn a private chain, the private chain will be one or
more ethereum nodes on a new subprocesss. These nodes may fail to start on
concurrent test runs, mostly because of port number conflicts, but even
though the test fails to start its private chain it may run interacting
with the geth process from a different test run! This leads to
unreasonable test errors.
This fixture creates a random marker used to distinguish pytest runs and
avoid test failures. Note this could fail for other reasons and fail to
detect unwanted interations if the user sets the PYTHONHASHSEED to the same
value.
"""
random_hex = hex(random.getrandbits(100))
# strip the leading 0x and trailing L
return random_hex[2:-1]
def circlepoint(c,r):
"""Generate a point [x,y] lying on a circle by the circle equation.
Args:
c (list): The position of circle centre.
r (float): The radius of the circle.
Returns:
A point lies on a circle. The point position is random.
"""
x = random.uniform(-r,r) # Randomly find the x position.
negative = bool(random.getrandbits(1)) # Randomly set whether the point is on the positive y side or negative side.
y = math.sqrt(r**2-x**2) # The equation of the circle.
if negative:
y = -y
return [x+c[0],y+c[1]]
def encode_params(self, base_url, method, params):
params = params.copy()
if self.token:
params['oauth_token'] = self.token
params['oauth_consumer_key'] = self.consumer_key
params['oauth_signature_method'] = 'HMAC-SHA1'
params['oauth_version'] = '1.0'
params['oauth_timestamp'] = str(int(time()))
params['oauth_nonce'] = str(getrandbits(64))
enc_params = urlencode_noplus(sorted(params.items()))
key = self.consumer_secret + "&" + urllib_parse.quote(self.token_secret, safe='~')
message = '&'.join(
urllib_parse.quote(i, safe='~') for i in [method.upper(), base_url, enc_params])
signature = (base64.b64encode(hmac.new(
key.encode('ascii'), message.encode('ascii'), hashlib.sha1)
.digest()))
return enc_params + "&" + "oauth_signature=" + urllib_parse.quote(signature, safe='~')
def gouv_api_csv(csv):
output_csv = ''
for line in csv.splitlines()[1:]:
output_csv += line
# Always fills the data of the result used for tests.
if bool(random.getrandbits(1)) or "-21.9419851,64.14602" in line: #"64,1460200,-21,9419851" in line:
output_csv += ',' + ','.join([
str(fake.latitude()), # "result_latitude"
str(fake.longitude()), # "result_longitude"
fake.address().replace('\n', ' ').replace(',', ' '), # "result_label"
str(random.randint(0, 300)), # "result_distance"
"housenumber", # "result_type"
"44109_XXXX_984eec", # "result_id"
fake.building_number(), # "result_housenumber"
fake.street_name(), # "result_name", the name of the street.
'', # "result_street", empty in most case.
fake.postcode(), # "result_postcode"
fake.city(), # "result_city"
'"' + fake.postcode()[:2] + ' ' + fake.department()[1] + ', ' + fake.region() + '"', # "result_context"
fake.postcode()[:2] + str(random.randint(100, 300)), # "result_citycode"
])
else:
output_csv += ',,,,,,,,,,,,,'
output_csv += '\n'
return output_csv
def result_properties():
properties = {}
properties["access"] = "yes"
if bool(random.getrandbits(1)):
properties["name"] = fake.company()
if bool(random.getrandbits(1)):
properties["phone"] = fake.phone_number()
if bool(random.getrandbits(1)):
properties["fee"] = "yes"
elif bool(random.getrandbits(1)):
properties["fee"] = "no"
if bool(random.getrandbits(1)):
properties["opening_hours"] = "Mo-Su 09:00-19:00"
elif bool(random.getrandbits(1)):
properties["opening_hours"] = "Mo-Th 12:30-16:30"
return properties
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = struct.unpack('=QQ', fast_urandom16())
byte_s = struct.pack('=QQ',
rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = struct.pack('=QQ',
rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def test_random_read_write(self):
"""Test random read/write"""
q = Queue(self.path)
n = 0
for i in range(1000):
if random.random() < 0.5:
if n > 0:
q.get_nowait()
q.task_done()
n -= 1
else:
with self.assertRaises(Empty):
q.get_nowait()
else:
q.put('var%d' % random.getrandbits(16))
n += 1
def run(self, q_job):
"""Run circuits in q_job"""
# Generating a string id for the job
job_id = str(uuid.uuid4())
result_list = []
qobj = q_job.qobj
self._sim = Simulator(gate_fusion=True)
if 'seed' in qobj['config']:
self._seed = qobj['config']['seed']
self._sim._simulator = CppSim(self._seed)
else:
self._seed = random.getrandbits(32)
self._shots = qobj['config']['shots']
for circuit in qobj['circuits']:
result_list.append(self.run_circuit(circuit))
return Result({'job_id': job_id, 'result': result_list,
'status': 'COMPLETED'},
qobj)
def test_append_rules(self, asa):
rules = []
for i in range(1, 351):
protocol = choice(["tcp", "udp"])
if bool(getrandbits(1)):
src = IPAddress(randint(0, 4294967295))
else:
src = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr
if bool(getrandbits(1)):
dst = IPAddress(randint(0, 4294967295))
else:
dst = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr
dst_port = randint(1, 65535)
src_comp = choice([comp for comp in ServiceComparator])
dst_comp = choice([comp for comp in ServiceComparator])
rule = RuleTCPUDP(protocol=protocol, src=src, dst=dst, src_port=i, dst_port=dst_port,
src_comp=src_comp, dst_comp=dst_comp)
rules.append(rule)
asa.acl.append_rules(settings.test_acl, rules)
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def run(self):
if not self.coordinator.enabled:
return
log.debug("Coordinator thread for %s starting: %s" %
(self.coordinator.filesystem, threading.current_thread()))
while not self._stopping.is_set():
# Keep agents busy
self.generate_actions_for_agents()
# Stack up waiting actions
if bool(random.getrandbits(1)):
self.unassigned_requests.put(self.get_random_action())
self.cull_completed_requests()
# TODO: Very infrequently, randomly cancel an agent action
self._stopping.wait(HSM_COORDINATOR_LOOP_INTERVAL)
def image_read(self, imname):
image = misc.imread(imname, mode='RGB').astype(np.float)
r,c,ch = image.shape
if r < 299 or c < 299:
# TODO: check too small images
# print "##too small!!"
image = misc.imresize(image, (299, 299, 3))
elif r > 299 or c > 299:
image = image[(r-299)/2 : (r-299)/2 + 299, (c-299)/2 : (c-299)/2 + 299, :]
# print r, c, image.shape
assert image.shape == (299, 299, 3)
image = (image / 255.0) * 2.0 - 1.0
if self.random_noise:
add_noise = bool(random.getrandbits(1))
if add_noise:
eps = random.choice([4.0, 8.0, 12.0, 16.0]) / 255.0 * 2.0
noise_image = image + eps * np.random.choice([-1, 1], (299,299,3))
image = np.clip(noise_image, -1.0, 1.0)
return image
def AdminLogin(req,onlyhead):
global currentcookie
passwd = req.query.get('passwd',[''])[0]
salt = Config.conf['AdministratorPassword'][:2]
passwd = crypt.crypt(passwd,salt)
if passwd != Config.conf['AdministratorPassword']:
return Delegate('/errors/wrongpasswd.html',req,onlyhead)
random.seed(os.urandom(200));
currentcookie = str(random.getrandbits(200))
handler = EH_Generic_class()
handler.iamadmin = 1
(header,content) = Site['/adminmenu.html'].getresult(req,onlyhead,handler)
header['Set-Cookie'] = 'OKUSON='+currentcookie+ \
';Path=/;Max-Age=3600;Version=1'
# Max-Age is one hour
#header['Location'] = '/adminmenu.html'
# Taken out to please opera, which does not get the cookie for the
# login with this header. Max.
return (header,content)
def uniform_random(lower_bound, upper_bound):
"""
Question 5.10: Generates uniform random number
within lower and upper bounds, inclusive
"""
num_outcomes = upper_bound - lower_bound + 1
result = None
while True:
result = 0
i = 0
while 1 << i < num_outcomes:
result = (result << 1) | getrandbits(1)
i += 1
if result < num_outcomes:
break
return result + lower_bound
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def generateChallengeResponseV2(password, user, server_challenge, server_info, domain = '', client_challenge = None):
client_timestamp = '\0' * 8
if not client_challenge:
client_challenge = ''
for i in range(0, 8):
client_challenge += chr(random.getrandbits(8))
assert len(client_challenge) == 8
d = MD4()
d.update(password.encode('UTF-16LE'))
ntlm_hash = d.digest() # The NT password hash
response_key = hmac.new(ntlm_hash, (user.upper() + domain).encode('UTF-16LE')).digest() # The NTLMv2 password hash. In [MS-NLMP], this is the result of NTOWFv2 and LMOWFv2 functions
temp = client_timestamp + client_challenge + domain.encode('UTF-16LE') + server_info
nt_challenge_response = hmac.new(response_key, server_challenge + temp).digest()
lm_challenge_response = hmac.new(response_key, server_challenge + client_challenge).digest() + client_challenge
session_key = hmac.new(response_key, nt_challenge_response).digest()
return nt_challenge_response, lm_challenge_response, session_key
################
# NTLMv1 Methods
################
def __init__(self, url, mode = "r"):
import random
try:
import xbmc
except:
xbmc = None
self.url = url
self.remote, self.share, self.path = path = connect(url)
self.mode = mode
self.binary = False
self.canread = False
self.canwrite = False
self.closed = True
self.size = 0
self.pos = 0
if xbmc:
self.tmp_path = os.path.join(xbmc.translatePath("special://temp/"), "%08x" % (random.getrandbits(32)))
else:
self.tmp_path = os.path.join(os.getenv("TEMP") or os.getenv("TMP") or os.getenv("TMPDIR"), "%08x" % (random.getrandbits(32)))
self.tmp_file = None
self.__get_mode__()
def dialog_select(self, heading, list):
ID = "%032x" %(random.getrandbits(128))
response = '<?xml version="1.0" encoding="UTF-8" ?>\n'
response +='<rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/">\n'
response +='<channel>\n'
response +='<link>/rss</link>\n'
response +='<title>'+heading+'</title>\n'
for option in list:
response += '<item>\n'
response += '<title>'+option +'</title>\n'
response += '<image>%s</image>\n'
response += '<link>http://'+self.controller.host+'/data/'+threading.current_thread().name +'/'+ID+'/'+str(list.index(option))+'</link>\n'
response += '</item>\n\n'
response += '</channel>\n'
response += '</rss>\n'
self.controller.send_data(response)
self.handler.server.shutdown_request(self.handler.request)
while not self.controller.get_data(ID):
continue
return int(self.controller.get_data(ID))
def test_bool_1_chan_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_line = random.choice(x_series_device.do_lines).name
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_line, line_grouping=LineGrouping.CHAN_PER_LINE)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
# Verify setting number_of_samples_per_channel (even to 1)
# returns a list.
value_read = task.read(number_of_samples_per_channel=1)
assert isinstance(value_read, list)
assert len(value_read) == 1
def test_bool_n_chan_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
number_of_channels = random.randint(2, len(x_series_device.do_lines))
do_lines = random.sample(x_series_device.do_lines, number_of_channels)
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
flatten_channel_string([d.name for d in do_lines]),
line_grouping=LineGrouping.CHAN_PER_LINE)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in
range(number_of_channels)]
task.write(values_to_test)
time.sleep(0.001)
values_read = task.read()
assert values_read == values_to_test
# Verify setting number_of_samples_per_channel (even to 1)
# returns a list of lists.
value_read = task.read(number_of_samples_per_channel=1)
assert isinstance(value_read, list)
assert isinstance(value_read[0], list)
def test_uint_1_chan_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(x_series_device.do_ports)
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
# Verify setting number_of_samples_per_channel (even to 1)
# returns a list.
value_read = task.read(number_of_samples_per_channel=1)
assert isinstance(value_read, list)
assert len(value_read) == 1
def test_many_sample_port_byte(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
number_of_samples = random.randint(2, 20)
do_port = random.choice(
[d for d in x_series_device.do_ports if d.do_port_width <= 8])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = numpy.array(
[int(random.getrandbits(do_port.do_port_width))
for _ in range(number_of_samples)], dtype=numpy.uint8)
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
task.start()
writer.write_many_sample_port_byte(values_to_test)
time.sleep(0.001)
# Since we're writing to and reading from ONLY the digital
# output lines, we can't use sample clocks to correlate the
# read and write sampling times. Thus, we essentially read
# the last value written multiple times.
values_read = numpy.zeros(number_of_samples, dtype=numpy.uint8)
reader.read_many_sample_port_byte(
values_read, number_of_samples_per_channel=number_of_samples)
expected_values = [
values_to_test[-1] for _ in range(number_of_samples)]
numpy.testing.assert_array_equal(values_read, expected_values)