Python random 模块,uniform() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用random.uniform()。
def __call__(self, img):
for attempt in range(10):
area = img.size[0] * img.size[1]
target_area = random.uniform(0.9, 1.) * area
aspect_ratio = random.uniform(7. / 8, 8. / 7)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if random.random() < 0.5:
w, h = h, w
if w <= img.size[0] and h <= img.size[1]:
x1 = random.randint(0, img.size[0] - w)
y1 = random.randint(0, img.size[1] - h)
img = img.crop((x1, y1, x1 + w, y1 + h))
assert (img.size == (w, h))
return img.resize((self.size, self.size), self.interpolation)
# Fallback
scale = Scale(self.size, interpolation=self.interpolation)
crop = CenterCrop(self.size)
return crop(scale(img))
def test_get_many():
source = IntFloatDataSource()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
query = {VALUE_KEY: value, COUNT_KEY: VALUES_COUNT}
result = source.get_many(int, query)
assert type(result) is GENERATOR_CLASS
for res in result:
assert type(res) is int
assert res == value
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
query = {VALUE_KEY: value, COUNT_KEY: VALUES_COUNT}
result = source.get_many(float, query)
assert type(result) is GENERATOR_CLASS
for res in result:
assert type(res) is float
assert res == value
def serviceA(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'A')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service A got:"
print message
if message == "Service A":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service A did your laundry")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# arguments must correspond to arguments for computaiton; multiple arguments
# (as in this case) can be given as tuples
args = [(i, random.uniform(2, 5)) for i in range(njobs)]
results = yield computation.run_results(compute, args)
# Tasks may not be executed in the order of given list of args, but
# results would be in the same order of given list of args
for result in results:
print(' result for %d from %s: %s' % result)
# wait for all jobs to be done and close computation
yield computation.close()
def client_proc(computation, task=None):
# schedule computation with the scheduler
if (yield computation.schedule()):
raise Exception('schedule failed')
i = 0
while True:
cmd = yield task.receive()
if cmd is None:
break
i += 1
c = C(i)
c.n = random.uniform(20, 50)
# unlike in dispycos_client*.py, here 'run_async' is used to run as
# many tasks as given on servers (i.e., possibly more than one
# task on a server at any time).
rtask = yield computation.run_async(compute, c, task)
if isinstance(rtask, pycos.Task):
print(' %s: rtask %s created' % (i, rtask))
else:
print(' %s: rtask failed: %s' % (i, rtask))
# unlike in dispycos_httpd1.py, here 'await_async' is not used, so any
# running async tasks are just terminated.
yield computation.close()
def client_proc(computation, task=None):
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# execute 10 jobs (tasks) and get their results. Note that number of jobs
# created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
args = [(algorithms[i % len(algorithms)], random.uniform(5, 10)) for i in range(15)]
results = yield computation.run_results(compute, args)
for i, result in enumerate(results):
if isinstance(result, tuple) and len(result) == 3:
print(' %ssum for %s: %s' % (result[1], result[0], result[2]))
else:
print(' rtask failed for %s: %s' % (args[i][0], str(result)))
yield computation.close()
def client_proc(computation, task=None):
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# execute 10 jobs (tasks) and get their results. Note that number of jobs
# created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
yield task.sleep(2)
algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
args = [(algorithms[i % len(algorithms)], random.uniform(1, 3)) for i in range(15)]
results = yield computation.run_results(compute, args)
for i, result in enumerate(results):
if isinstance(result, tuple) and len(result) == 3:
print(' %ssum for %s: %s' % (result[1], result[0], result[2]))
else:
print(' rtask failed for %s: %s' % (args[i][0], str(result)))
yield computation.close()
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# run jobs
for i in range(njobs):
# computation is supposed to be CPU bound so 'run' is used so at most
# one computations runs at a server at any time; for mostly idle
# computations, use 'run_async' to run more than one computation at a
# server at the same time.
rtask = yield computation.run(compute, random.uniform(5, 10))
if isinstance(rtask, pycos.Task):
print(' job %s processed by %s' % (i, rtask.location))
else:
print('rtask %s failed: %s' % (i, rtask))
# wait for all jobs to be done and close computation
yield computation.close()
def client_proc(job_id, data_file, rtask, task=None):
# send input file to rtask.location; this will be saved to dispycos process's
# working directory
if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
print('Could not send input data to %s' % rtask.location)
# terminate remote task
rtask.send(None)
raise StopIteration(-1)
# send info about input
obj = C(job_id, data_file, random.uniform(5, 8), task)
if (yield rtask.deliver(obj)) != 1:
print('Could not send input to %s' % rtask.location)
raise StopIteration(-1)
# rtask sends result to this task as message
result = yield task.receive()
if not result.result_file:
print('Processing %s failed' % obj.i)
raise StopIteration(-1)
# rtask saves results file at this client, which is saved in pycos's
# dest_path, not current working directory!
result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
# move file to cwd
target = os.path.join(os.getcwd(), os.path.basename(result_file))
os.rename(result_file, target)
print(' job %s output is in %s' % (obj.i, target))
def rti_test(task=None):
# if server is on remote network, automatic discovery won't work,
# so add it explicitly
# yield scheduler.peer('192.168.21.5')
# get reference to RTI at server
rti1 = yield pycos.RTI.locate('rti_1')
print('RTI is at %s' % rti1.location)
# 5 (remote) tasks are created with rti1
n = 5
# set monitor (monitor_proc task) for tasks created for this RTI
yield rti1.monitor(pycos.Task(monitor_proc, n))
for i in range(n):
rtask = yield rti1('test%s' % i, b=i)
pycos.logger.debug('RTI %s created' % rtask)
# If necessary, each rtask can also be set (different) 'monitor'
rtask.send('msg:%s' % i)
yield task.sleep(random.uniform(0, 1))
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# pair EC2 node with this client with:
yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347))
# if multiple nodes are used, 'broadcast' option can be used to pair with
# all nodes with just one statement as:
# yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347), broadcast=True)
# execute n jobs (tasks) and get their results. Note that number of
# jobs created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
args = [random.uniform(3, 10) for _ in range(njobs)]
results = yield computation.run_results(compute, args)
for result in results:
print('result: %s' % result)
yield computation.close()
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# arguments must correspond to arguments for computaiton; multiple arguments
# (as in this case) can be given as tuples
args = [(i, random.uniform(2, 5)) for i in range(njobs)]
results = yield computation.run_results(compute, args)
# Tasks may not be executed in the order of given list of args, but
# results would be in the same order of given list of args
for result in results:
print(' result for %d from %s: %s' % result)
# wait for all jobs to be done and close computation
yield computation.close()
def client_proc(computation, task=None):
# schedule computation with the scheduler
if (yield computation.schedule()):
raise Exception('schedule failed')
i = 0
while True:
cmd = yield task.receive()
if cmd is None:
break
i += 1
c = C(i)
c.n = random.uniform(20, 50)
# unlike in dispycos_client*.py, here 'run_async' is used to run as
# many tasks as given on servers (i.e., possibly more than one
# task on a server at any time).
rtask = yield computation.run_async(compute, c, task)
if isinstance(rtask, pycos.Task):
print(' %s: rtask %s created' % (i, rtask))
else:
print(' %s: rtask failed: %s' % (i, rtask))
# unlike in dispycos_httpd1.py, here 'await_async' is not used, so any
# running async tasks are just terminated.
yield computation.close()
def client_proc(task=None):
# create channel
channel = pycos.Channel('sum_prod')
# create tasks to compute sum and product of numbers sent
sum_task = pycos.Task(seqsum)
prod_task = pycos.Task(seqprod)
# subscribe tasks to channel so they receive messages
yield channel.subscribe(sum_task)
yield channel.subscribe(prod_task)
# send 4 numbers to channel
for _ in range(4):
r = random.uniform(0.5, 3)
channel.send(r)
print('sent %f' % r)
# send None to indicate end of data
channel.send(None)
yield channel.unsubscribe(sum_task)
yield channel.unsubscribe(prod_task)
def client_proc(computation, task=None):
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# execute 10 jobs (tasks) and get their results. Note that number of jobs
# created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
yield task.sleep(2)
algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
args = [(algorithms[i % len(algorithms)], random.uniform(1, 3)) for i in range(15)]
results = yield computation.run_results(compute, args)
for i, result in enumerate(results):
if isinstance(result, tuple) and len(result) == 3:
print(' %ssum for %s: %s' % (result[1], result[0], result[2]))
else:
print(' rtask failed for %s: %s' % (args[i][0], str(result)))
yield computation.close()
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# run jobs
for i in range(njobs):
# computation is supposed to be CPU bound so 'run' is used so at most
# one computations runs at a server at any time; for mostly idle
# computations, use 'run_async' to run more than one computation at a
# server at the same time.
rtask = yield computation.run(compute, random.uniform(5, 10))
if isinstance(rtask, pycos.Task):
print(' job %s processed by %s' % (i, rtask.location))
else:
print('rtask %s failed: %s' % (i, rtask))
# wait for all jobs to be done and close computation
yield computation.close()
def rti_test(task=None):
# if server is on remote network, automatic discovery won't work,
# so add it explicitly
# yield scheduler.peer('192.168.21.5')
# get reference to RTI at server
rti1 = yield pycos.RTI.locate('rti_1')
print('RTI is at %s' % rti1.location)
# 5 (remote) tasks are created with rti1
n = 5
# set monitor (monitor_proc task) for tasks created for this RTI
yield rti1.monitor(pycos.Task(monitor_proc, n))
for i in range(n):
rtask = yield rti1('test%s' % i, b=i)
pycos.logger.debug('RTI %s created' % rtask)
# If necessary, each rtask can also be set (different) 'monitor'
rtask.send('msg:%s' % i)
yield task.sleep(random.uniform(0, 1))
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# pair EC2 node with this client with:
yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347))
# if multiple nodes are used, 'broadcast' option can be used to pair with
# all nodes with just one statement as:
# yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347), broadcast=True)
# execute n jobs (tasks) and get their results. Note that number of
# jobs created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
args = [random.uniform(3, 10) for _ in range(njobs)]
results = yield computation.run_results(compute, args)
for result in results:
print('result: %s' % result)
yield computation.close()
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# arguments must correspond to arguments for computaiton; multiple arguments
# (as in this case) can be given as tuples
args = [(i, random.uniform(2, 5)) for i in range(njobs)]
results = yield computation.run_results(compute, args)
# Tasks may not be executed in the order of given list of args, but
# results would be in the same order of given list of args
for result in results:
print(' result for %d from %s: %s' % result)
# wait for all jobs to be done and close computation
yield computation.close()
def client_proc(computation, task=None):
# schedule computation with the scheduler
if (yield computation.schedule()):
raise Exception('schedule failed')
i = 0
while True:
cmd = yield task.receive()
if cmd is None:
break
i += 1
c = C(i)
c.n = random.uniform(20, 50)
# unlike in dispycos_client*.py, here 'run_async' is used to run as
# many tasks as given on servers (i.e., possibly more than one
# task on a server at any time).
rtask = yield computation.run_async(compute, c, task)
if isinstance(rtask, pycos.Task):
print(' %s: rtask %s created' % (i, rtask))
else:
print(' %s: rtask failed: %s' % (i, rtask))
# unlike in dispycos_httpd1.py, here 'await_async' is not used, so any
# running async tasks are just terminated.
yield computation.close()
def client_proc(computation, task=None):
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# execute 10 jobs (tasks) and get their results. Note that number of jobs
# created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
args = [(algorithms[i % len(algorithms)], random.uniform(5, 10)) for i in range(15)]
results = yield computation.run_results(compute, args)
for i, result in enumerate(results):
if isinstance(result, tuple) and len(result) == 3:
print(' %ssum for %s: %s' % (result[1], result[0], result[2]))
else:
print(' rtask failed for %s: %s' % (args[i][0], str(result)))
yield computation.close()
def client_proc(computation, task=None):
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# execute 10 jobs (tasks) and get their results. Note that number of jobs
# created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
yield task.sleep(2)
algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
args = [(algorithms[i % len(algorithms)], random.uniform(1, 3)) for i in range(15)]
results = yield computation.run_results(compute, args)
for i, result in enumerate(results):
if isinstance(result, tuple) and len(result) == 3:
print(' %ssum for %s: %s' % (result[1], result[0], result[2]))
else:
print(' rtask failed for %s: %s' % (args[i][0], str(result)))
yield computation.close()
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# run jobs
for i in range(njobs):
# computation is supposed to be CPU bound so 'run' is used so at most
# one computations runs at a server at any time; for mostly idle
# computations, use 'run_async' to run more than one computation at a
# server at the same time.
rtask = yield computation.run(compute, random.uniform(5, 10))
if isinstance(rtask, pycos.Task):
print(' job %s processed by %s' % (i, rtask.location))
else:
print('rtask %s failed: %s' % (i, rtask))
# wait for all jobs to be done and close computation
yield computation.close()
def client_proc(job_id, data_file, rtask, task=None):
# send input file to rtask.location; this will be saved to dispycos process's
# working directory
if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
print('Could not send input data to %s' % rtask.location)
# terminate remote task
rtask.send(None)
raise StopIteration(-1)
# send info about input
obj = C(job_id, data_file, random.uniform(5, 8), task)
if (yield rtask.deliver(obj)) != 1:
print('Could not send input to %s' % rtask.location)
raise StopIteration(-1)
# rtask sends result to this task as message
result = yield task.receive()
if not result.result_file:
print('Processing %s failed' % obj.i)
raise StopIteration(-1)
# rtask saves results file at this client, which is saved in pycos's
# dest_path, not current working directory!
result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
# move file to cwd
target = os.path.join(os.getcwd(), os.path.basename(result_file))
os.rename(result_file, target)
print(' job %s output is in %s' % (obj.i, target))
def rti_test(task=None):
# if server is on remote network, automatic discovery won't work,
# so add it explicitly
# yield scheduler.peer('192.168.21.5')
# get reference to RTI at server
rti1 = yield pycos.RTI.locate('rti_1')
print('RTI is at %s' % rti1.location)
# 5 (remote) tasks are created with rti1
n = 5
# set monitor (monitor_proc task) for tasks created for this RTI
yield rti1.monitor(pycos.Task(monitor_proc, n))
for i in range(n):
rtask = yield rti1('test%s' % i, b=i)
pycos.logger.debug('RTI %s created' % rtask)
# If necessary, each rtask can also be set (different) 'monitor'
rtask.send('msg:%s' % i)
yield task.sleep(random.uniform(0, 1))
def client_proc(computation, njobs, task=None):
# schedule computation with the scheduler; scheduler accepts one computation
# at a time, so if scheduler is shared, the computation is queued until it
# is done with already scheduled computations
if (yield computation.schedule()):
raise Exception('Could not schedule computation')
# pair EC2 node with this client with:
yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347))
# if multiple nodes are used, 'broadcast' option can be used to pair with
# all nodes with just one statement as:
# yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347), broadcast=True)
# execute n jobs (tasks) and get their results. Note that number of
# jobs created can be more than number of server processes available; the
# scheduler will use as many processes as necessary/available, running one
# job at a server process
args = [random.uniform(3, 10) for _ in range(njobs)]
results = yield computation.run_results(compute, args)
for result in results:
print('result: %s' % result)
yield computation.close()
def generate_example(seq_length, min_val, max_val):
"""
Creates a list of (a,b) tuples where a is random[min_val,max_val] and b is 1 in only
two tuples, 0 for the rest. The ground truth is the addition of a values for tuples with b=1.
:param seq_length: length of the sequence to be generated
:param min_val: minimum value for a
:param max_val: maximum value for a
:return x: list of (a,b) tuples
:return y: ground truth
"""
# Select b values: one in first X% of the sequence, the other in the second Y%
b1 = random.randint(0, int(seq_length * FIRST_MARKER / 100.) - 1)
b2 = random.randint(int(seq_length * SECOND_MARKER / 100.), seq_length - 1)
b = [0.] * seq_length
b[b1] = 1.
b[b2] = 1.
# Generate list of tuples
x = [(random.uniform(min_val, max_val), marker) for marker in b]
y = x[b1][0] + x[b2][0]
return x, y
def createMetaball(origin=(0, 0, 0), n=30, r0=4, r1=2.5):
metaball = bpy.data.metaballs.new('MetaBall')
obj = bpy.data.objects.new('MetaBallObject', metaball)
bpy.context.scene.objects.link(obj)
metaball.resolution = 0.2
metaball.render_resolution = 0.05
for i in range(n):
location = Vector(origin) + Vector(random.uniform(-r0, r0) for i in range(3))
element = metaball.elements.new()
element.co = location
element.radius = r1
return metaball
def _send_http_post(self, pause=10):
global stop_now
self.socks.send("POST / HTTP/1.1\r\n"
"Host: %s\r\n"
"User-Agent: %s\r\n"
"Connection: keep-alive\r\n"
"Keep-Alive: 900\r\n"
"Content-Length: 10000\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n\r\n" %
(self.host, random.choice(useragents)))
for i in range(0, 9999):
if stop_now:
self.running = False
break
p = random.choice(string.letters+string.digits)
print term.BOL+term.UP+term.CLEAR_EOL+"Posting: %s" % p+term.NORMAL
self.socks.send(p)
time.sleep(random.uniform(0.1, 3))
self.socks.close()
def __init__(self, auth_provider, device_info=None):
self.log = logging.getLogger(__name__)
self._auth_provider = auth_provider
# mystical unknown6 - resolved by PokemonGoDev
self._signal_agglom_gen = False
self._signature_lib = None
if RpcApi.START_TIME == 0:
RpcApi.START_TIME = get_time(ms=True)
if RpcApi.RPC_ID == 0:
RpcApi.RPC_ID = int(random.random() * 10 ** 18)
self.log.debug('Generated new random RPC Request id: %s', RpcApi.RPC_ID)
# data fields for unknown6
self.session_hash = os.urandom(32)
self.token2 = random.randint(1,59)
self.course = random.uniform(0, 360)
self.device_info = device_info
def genetic_algorithm(population, fitness_fn, ngen=1000, pmut=0.0):
"""[Fig. 4.7]"""
def reproduce(p1, p2):
c = random.randrange(len(p1))
return p1[:c] + p2[c:]
for i in range(ngen):
new_population = []
for i in len(population):
p1, p2 = random_weighted_selections(population, 2, fitness_fn)
child = reproduce(p1, p2)
if random.uniform(0,1) > pmut:
child.mutate()
new_population.append(child)
population = new_population
return argmax(population, fitness_fn)
def random_weighted_selection(seq, n, weight_fn):
"""Pick n elements of seq, weighted according to weight_fn.
That is, apply weight_fn to each element of seq, add up the total.
Then choose an element e with probability weight[e]/total.
Repeat n times, with replacement. """
totals = []; runningtotal = 0
for item in seq:
runningtotal += weight_fn(item)
totals.append(runningtotal)
selections = []
for s in range(n):
r = random.uniform(0, totals[-1])
for i in range(len(seq)):
if totals[i] > r:
selections.append(seq[i])
break
return selections
#_____________________________________________________________________________
# The remainder of this file implements examples for the search algorithms.
#______________________________________________________________________________
# Graphs and Graph Problems
def RandomGraph(nodes=range(10), min_links=2, width=400, height=300,
curvature=lambda: random.uniform(1.1, 1.5)):
"""Construct a random graph, with the specified nodes, and random links.
The nodes are laid out randomly on a (width x height) rectangle.
Then each node is connected to the min_links nearest neighbors.
Because inverse links are added, some nodes will have more connections.
The distance between nodes is the hypotenuse times curvature(),
where curvature() defaults to a random number between 1.1 and 1.5."""
g = UndirectedGraph()
g.locations = {}
## Build the cities
for node in nodes:
g.locations[node] = (random.randrange(width), random.randrange(height))
## Build roads from each city to at least min_links nearest neighbors.
for i in range(min_links):
for node in nodes:
if len(g.get(node)) < min_links:
here = g.locations[node]
def distance_to_node(n):
if n is node or g.get(node,n): return infinity
return distance(g.locations[n], here)
neighbor = argmin(nodes, distance_to_node)
d = distance(g.locations[neighbor], here) * curvature()
g.connect(node, neighbor, int(d))
return g
def rate(self, message_object, user):
'''
# totally not rigged or something
def isDevMentioned():
for u in message_object.mentions:
if u.name == "Theraga" or u.name == "Dynista":
return True
return False
if user == "theraga" or user == "Theraga" or user == "dynista" or user == "Dynista" or isDevMentioned():
await self.pm.client.send_message(message_object.channel, "I would rate **" + user + "** 100.00/100")
else:
'''
number = round(random.uniform(1, 100), 2)
print(message_object.mentions)
await self.pm.client.send_message(message_object.channel,
"I would rate " + "**" + user + "** " + str(number) + "/100")
def __call__(self, img):
for attempt in range(10):
area = img.size[0] * img.size[1]
target_area = random.uniform(0.9, 1.) * area
aspect_ratio = random.uniform(7. / 8, 8. / 7)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if random.random() < 0.5:
w, h = h, w
if w <= img.size[0] and h <= img.size[1]:
x1 = random.randint(0, img.size[0] - w)
y1 = random.randint(0, img.size[1] - h)
img = img.crop((x1, y1, x1 + w, y1 + h))
assert (img.size == (w, h))
return img.resize((self.size, self.size), self.interpolation)
# Fallback
scale = Scale(self.size, interpolation=self.interpolation)
crop = CenterCrop(self.size)
return crop(scale(img))
def __call__(self, img):
for attempt in range(10):
area = img.size[0] * img.size[1]
target_area = random.uniform(0.70, 0.98) * area
aspect_ratio = random.uniform(5. / 8, 8. / 5)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if random.random() < 0.5:
w, h = h, w
if w <= img.size[0] and h <= img.size[1]:
x1 = random.randint(0, img.size[0] - w)
y1 = random.randint(0, img.size[1] - h)
img = img.crop((x1, y1, x1 + w, y1 + h))
assert (img.size == (w, h))
return img.resize((self.size, self.size), self.interpolation)
# Fallback
scale = Scale(self.size, interpolation=self.interpolation)
crop = CenterCrop(self.size)
return crop(scale(img))
def __call__(self, img):
for attempt in range(10):
area = img.size[0] * img.size[1]
target_area = random.uniform(0.70, 0.98) * area
aspect_ratio = random.uniform(5. / 8, 8. / 5)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if random.random() < 0.5:
w, h = h, w
if w <= img.size[0] and h <= img.size[1]:
x1 = random.randint(0, img.size[0] - w)
y1 = random.randint(0, img.size[1] - h)
img = img.crop((x1, y1, x1 + w, y1 + h))
assert (img.size == (w, h))
return img.resize((self.size, self.size), self.interpolation)
# Fallback
scale = Scale(self.size, interpolation=self.interpolation)
crop = CenterCrop(self.size)
return crop(scale(img))
def __call__(self, img):
for attempt in range(10):
area = img.size[0] * img.size[1]
target_area = random.uniform(0.9, 1.) * area
aspect_ratio = random.uniform(7. / 8, 8. / 7)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if random.random() < 0.5:
w, h = h, w
if w <= img.size[0] and h <= img.size[1]:
x1 = random.randint(0, img.size[0] - w)
y1 = random.randint(0, img.size[1] - h)
img = img.crop((x1, y1, x1 + w, y1 + h))
assert (img.size == (w, h))
return img.resize((self.size, self.size), self.interpolation)
# Fallback
scale = Scale(self.size, interpolation=self.interpolation)
crop = CenterCrop(self.size)
return crop(scale(img))
def test_put():
sink = IntFloatDataSink()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
result = sink.put(int, value)
assert result is None
assert value in sink.items[int]
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
result = sink.put(float, value)
assert result is None
assert value in sink.items[float]
def test_wildcard_put():
sink = SimpleWildcardDataSink()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
result = sink.put(int, value)
assert result is None
assert value in sink.items[int]
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
result = sink.put(float, value)
assert result is None
assert value in sink.items[float]
#####################
# Put Many Function #
#####################
def test_put_many():
sink = IntFloatDataSink()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
items = (value for _ in range(VALUES_COUNT))
result = sink.put_many(int, items)
assert result is None
assert value in sink.items[int]
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
items = (value for _ in range(VALUES_COUNT))
result = sink.put_many(float, items)
assert result is None
assert value in sink.items[float]
def test_wildcard_put_many():
sink = SimpleWildcardDataSink()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
items = (value for _ in range(VALUES_COUNT))
result = sink.put_many(int, items)
assert result is None
assert value in sink.items[int]
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
items = (value for _ in range(VALUES_COUNT))
result = sink.put_many(float, items)
assert result is None
assert value in sink.items[float]
#####################
# CompositeDataSink #
#####################
def test_get():
source = IntFloatDataSource()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
query = {VALUE_KEY: value}
result = source.get(int, query)
assert type(result) is int
assert result == value
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
query = {VALUE_KEY: value}
result = source.get(float, query)
assert type(result) is float
assert result == value
def test_wildcard_get():
source = SimpleWildcardDataSource()
values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
query = {VALUE_KEY: value}
result = source.get(int, query)
assert type(result) is int
assert result == value
values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)]
for value in values:
query = {VALUE_KEY: value}
result = source.get(float, query)
assert type(result) is float
assert result == value
#####################
# Get Many Function #
#####################
def mutation(cls, indiv):
newInd = indiv.duplicate()
nbMute = int(random.random()*(len(newInd.__x)-1))+1
toMute = []
for i, x in enumerate(indiv.__x):
if len(toMute) < nbMute:
toMute.append(i)
else:
iMin = 0
for im, i2 in enumerate(toMute):
if abs(indiv.__x[i2]) < abs(indiv.__x[toMute[iMin]]):
iMin = im
if abs(x) > abs(indiv.__x[toMute[iMin]]):
toMute[iMin] = i
for i in toMute:
newInd.__x[i] = random.uniform(cls.__BOUNDS[0], cls.__BOUNDS[1])
return newInd
def __generateRandomVarValue(cls, iVar):
# 1- Get the definition domain of the variable
defDomain = cls.VARIABLES_RANGES[iVar]
if defDomain is None:
randFloat = random.uniform(-sys.maxint-1, sys.maxint)
else:
# 2- Check the open/closed bounds
includeFirst = defDomain[0] == '['
includeLast = defDomain[-1] == ']'
# 3- Get a random number in the domain
defDomain = eval('[' + defDomain[1:-1] + ']')
randFloat = random.random()*(defDomain[1]-defDomain[0]) + defDomain[0]
# 4- Check the bounds
while (randFloat == defDomain[0] and not includeFirst) or\
(randFloat == defDomain[1] and not includeLast):
randFloat = random.random()*(defDomain[1]-defDomain[0]) + defDomain[0]
# 5- Cast the variable type
return cls.VARIABLES_TYPE(randFloat)
# ----------------------
def recaptcha(self):
""" Check if we need to solve a captcha.
This will open in the default browser.
If we choose to not solve the captcha should it be required,
we will then be considered a 'lurker' and we will not be able to chat
and our name will be shown as a guest name in the room.
"""
t = str(random.uniform(0.9, 0.10))
_url = 'https://tinychat.com/cauth/captcha?{0}'.format(t)
_response = util.web.http_get(url=_url, json=True, proxy=self.proxy)
log.debug('recaptcha response: %s' % _response)
if _response['json'] is not None:
if _response['json']['need_to_solve_captcha'] == 1:
link = 'https://tinychat.com/cauth/recaptcha?token={0}'.format(_response['json']['token'])
webbrowser.open(link, new=True)
print (link)
raw_input('Solve the captcha and click enter to continue.')
def mutation(self, mutation_type):
if mutation_type not in MUTATION_TYPES:
raise GenomeError("mutation type not supported")
threshold = random.uniform(self.mutation_lower_threshold, self.mutation_higher_threshold)
if threshold < self.mutation_threshold:
return
if mutation_type == "add_node":
self.mutate_add_node()
elif mutation_type == "remove_node":
self.mutate_remove_node()
elif mutation_type == "add_connection":
self.mutate_add_connection()
elif mutation_type == "remove_connection":
self.mutate_remove_connection()
else:
raise GenomeError("something wrong happened in mutation.")
def serviceB(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'B')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service B got:"
print message
if message == "Service B":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service B cleaned your room")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def query(self, images):
if self.pool_size == 0:
return images
return_images = []
for image in images.data:
image = torch.unsqueeze(image, 0)
if self.num_imgs < self.pool_size:
self.num_imgs = self.num_imgs + 1
self.images.append(image)
return_images.append(image)
else:
p = random.uniform(0, 1)
if p > 0.5:
random_id = random.randint(0, self.pool_size-1)
tmp = self.images[random_id].clone()
self.images[random_id] = image
return_images.append(tmp)
else:
return_images.append(image)
return_images = Variable(torch.cat(return_images, 0))
return return_images