Python functools 模块,partial() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用functools.partial()。
def add(self, categorize):
"""Add given method to categorize messages. When a message is received,
each of the added methods (most recently added method first) is called
with the message. The method should return a category (any hashable
object) or None (in which case next recently added method is called with
the same message). If all the methods return None for a given message,
the message is queued with category=None, so that 'receive' method here
works just as Task.receive.
"""
if inspect.isfunction(categorize):
argspec = inspect.getargspec(categorize)
if len(argspec.args) != 1:
categorize = None
elif type(categorize) != partial_func:
categorize = None
if categorize:
self._categorize.insert(0, categorize)
else:
logger.warning('invalid categorize function ignored')
def _build_multipart(cls, data):
"""
Build up the MIME payload for the POST data
"""
boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = b'\n--' + boundary
end_boundary = sep_boundary + b'--'
end_items = end_boundary, b"\n",
builder = functools.partial(
cls._build_part,
sep_boundary=sep_boundary,
)
part_groups = map(builder, data.items())
parts = itertools.chain.from_iterable(part_groups)
body_items = itertools.chain(parts, end_items)
content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
return b''.join(body_items), content_type
def _build_multipart(cls, data):
"""
Build up the MIME payload for the POST data
"""
boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = b'\n--' + boundary
end_boundary = sep_boundary + b'--'
end_items = end_boundary, b"\n",
builder = functools.partial(
cls._build_part,
sep_boundary=sep_boundary,
)
part_groups = map(builder, data.items())
parts = itertools.chain.from_iterable(part_groups)
body_items = itertools.chain(parts, end_items)
content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
return b''.join(body_items), content_type
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def add_local_charm_dir(self, charm_dir, series):
"""Upload a local charm to the model.
This will automatically generate an archive from
the charm dir.
:param charm_dir: Path to the charm directory
:param series: Charm series
"""
fh = tempfile.NamedTemporaryFile()
CharmArchiveGenerator(charm_dir).make_archive(fh.name)
with fh:
func = partial(
self.add_local_charm, fh, series, os.stat(fh.name).st_size)
charm_url = await self._connector.loop.run_in_executor(None, func)
log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
return charm_url
def __getattr__(self, name):
"""
Wrap method calls in coroutines that use run_in_executor to make them
async.
"""
attr = getattr(self._cs, name)
if not callable(attr):
wrapper = partial(getattr, self._cs, name)
setattr(self, name, wrapper)
else:
async def coro(*args, **kwargs):
method = partial(attr, *args, **kwargs)
for attempt in range(1, 4):
try:
return await self.loop.run_in_executor(None, method)
except theblues.errors.ServerError:
if attempt == 3:
raise
await asyncio.sleep(1, loop=self.loop)
setattr(self, name, coro)
wrapper = coro
return wrapper
def _ireduce_linalg(arrays, func, **kwargs):
"""
Yield the cumulative reduction of a linag algebra function
"""
arrays = iter(arrays)
first = next(arrays)
second = next(arrays)
func = partial(func, **kwargs)
accumulator = func(first, second)
yield accumulator
for array in arrays:
# For some reason, np.dot(..., out = accumulator) did not produce results
# that were equal to numpy.linalg.multi_dot
func(accumulator, array, out = accumulator)
yield accumulator
def iload(files, load_func, **kwargs):
"""
Create a stream of arrays from files, which are loaded lazily.
Parameters
----------
pattern : iterable of str or str
Either an iterable of filenames or a glob-like pattern str.
load_func : callable, optional
Function taking a filename as its first arguments
kwargs
Keyword arguments are passed to ``load_func``.
Yields
------
arr: `~numpy.ndarray`
Loaded data.
"""
if isinstance(files, str):
files = iglob(files)
files = iter(files)
yield from map(partial(load_func, **kwargs), files)
# pmap does not support local functions
def __init__(self, params):
super(DecodeText, self).__init__(params)
self._unk_mapping = None
self._unk_replace_fn = None
if self.params["unk_mapping"] is not None:
self._unk_mapping = _get_unk_mapping(self.params["unk_mapping"])
if self.params["unk_replace"]:
self._unk_replace_fn = functools.partial(
_unk_replace, mapping=self._unk_mapping)
self._postproc_fn = None
if self.params["postproc_fn"]:
self._postproc_fn = locate(self.params["postproc_fn"])
if self._postproc_fn is None:
raise ValueError("postproc_fn not found: {}".format(
self.params["postproc_fn"]))
def loads_with_persistent_ids(str, env):
"""
Performs a pickle loads on the given string, substituting the given
TradingEnvironment in to any tokenized representations of a
TradingEnvironment or AssetFinder.
Parameters
----------
str : String
The string representation of the object to be unpickled.
env : TradingEnvironment
The TradingEnvironment to be inserted to the unpickled object.
Returns
-------
obj
An unpickled object formed from the parameter 'str'.
"""
file = BytesIO(str)
unpickler = pickle.Unpickler(file)
unpickler.persistent_load = partial(_persistent_load, env=env)
return unpickler.load()
def display_graph(g, format='svg', include_asset_exists=False):
"""
Display a TermGraph interactively from within IPython.
"""
try:
import IPython.display as display
except ImportError:
raise NoIPython("IPython is not installed. Can't display graph.")
if format == 'svg':
display_cls = display.SVG
elif format in ("jpeg", "png"):
display_cls = partial(display.Image, format=format, embed=True)
out = BytesIO()
_render(g, out, format, include_asset_exists=include_asset_exists)
return display_cls(data=out.getvalue())
def __init__(self, min_, max_, float_=False):
super(QwordSpinBox, self).__init__()
self._minimum = min_
self._maximum = max_
self.int_ = float if float_ else int
rx = QRegExp('-?\d{0,20}(?:\.\d{0,20})?' if float_ else '-?\d{0,20}')
validator = QRegExpValidator(rx, self)
self._lineEdit = QLineEdit(self)
self._lineEdit.setText(str(self.int_(0)))
self._lineEdit.setValidator(validator)
self._lineEdit.textEdited.connect(partial(self.setValue, change=False))
self.editingFinished.connect(lambda: self.setValue(self.value(), update=False) or True)
self.setLineEdit(self._lineEdit)
def handle_event(self, event):
if event.get('type') == EVENT_TYPE_MESSAGE:
if self.message_pattern:
match = self.message_pattern.match(event.get('text', ''))
if match:
kwargs = match.groupdict()
args = () if kwargs else match.groups()
args = (event, event.get('text')) + args
else:
return
else:
args = (event, event.get('text'))
kwargs = {}
context_data = {}
handle_result = partial(self.handle_result, event)
self.container.spawn_worker(
self, args, kwargs,
context_data=context_data,
handle_result=handle_result)
def gl_init(self):
self.gl_vertex_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_VERTEX_SHADER))
self.gl_fragment_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_FRAGMENT_SHADER))
self.gl_program_factory = functools.lru_cache(maxsize=None)(GLProgram)
self.gl_texture_factory = functools.lru_cache(maxsize=None)(gx.texture.GLTexture)
array_table = {gx.VA_PTNMTXIDX:GLMatrixIndexArray()}
array_table.update((attribute,array.gl_convert()) for attribute,array in self.array_table.items())
for shape in self.shapes:
shape.gl_init(array_table)
for material in self.materials:
material.gl_init()
for texture in self.textures:
texture.gl_init(self.gl_texture_factory)
self.gl_joints = [copy.copy(joint) for joint in self.joints]
self.gl_joint_matrices = numpy.empty((len(self.joints),3,4),numpy.float32)
self.gl_matrix_table = gl.TextureBuffer(GL_DYNAMIC_DRAW,GL_RGBA32F,(len(self.matrix_descriptors),3,4),numpy.float32)
self.gl_update_matrix_table()
self.gl_draw_objects = list(self.gl_generate_draw_objects(self.scene_graph))
self.gl_draw_objects.sort(key=lambda draw_object: draw_object.material.unknown0)
def __getattr__(self, name):
"""\
This is used to plug-in external serializers.
When a "to_<name>" method is invoked, this method tries to find
a ``segno.plugin.converter`` plugin with the provided ``<name>``.
If such a plugin exists, a callable function is returned. The result
of invoking the function depends on the plugin.
"""
if name.startswith('to_'):
from pkg_resources import iter_entry_points
from functools import partial
for ep in iter_entry_points(group='segno.plugin.converter',
name=name[3:]):
plugin = ep.load()
return partial(plugin, self)
raise AttributeError('{0} object has no attribute {1}'
.format(self.__class__, name))
def files_exist(self, file_paths):
"""
Threaded exists for all file paths.
file_paths: (list) file paths to test for existence
Returns: { filepath: bool }
"""
results = {}
def exist_thunk(path, interface):
results[path] = interface.exists(path)
for path in file_paths:
if len(self._threads):
self.put(partial(exist_thunk, path))
else:
exist_thunk(path, self._interface)
desc = 'Existence Testing' if self.progress else None
self.wait(desc)
return results
def list_files(self, prefix="", flat=False):
"""
List the files in the layer with the given prefix.
flat means only generate one level of a directory,
while non-flat means generate all file paths with that
prefix.
Here's how flat=True handles different senarios:
1. partial directory name prefix = 'bigarr'
- lists the '' directory and filters on key 'bigarr'
2. full directory name prefix = 'bigarray'
- Same as (1), but using key 'bigarray'
3. full directory name + "/" prefix = 'bigarray/'
- Lists the 'bigarray' directory
4. partial file name prefix = 'bigarray/chunk_'
- Lists the 'bigarray/' directory and filters on 'chunk_'
Return: generated sequence of file paths relative to layer_path
"""
for f in self._interface.list_files(prefix, flat):
yield f
def wait_for_opacity(self, selector, opacity, **kwargs):
'''
Wait for an element to reach a specific opacity.
Parameters
----------
selector: str
A CSS selector to search for. This can be any valid CSS selector.
opacity: float
The opacity to wait for.
kwargs:
Passed on to _wait_for
'''
def _wait_for_opacity(self, browser):
return str(self.get_element(selector).value_of_css_property('opacity')) == str(opacity)
self._wait_for(partial(_wait_for_opacity, self), **kwargs)
def decompile(self, data, ttFont):
sstruct.unpack2(Glat_format_0, data, self)
if self.version <= 1.9:
decoder = partial(self.decompileAttributes12,fmt=Glat_format_1_entry)
elif self.version <= 2.9:
decoder = partial(self.decompileAttributes12,fmt=Glat_format_23_entry)
elif self.version >= 3.0:
(data, self.scheme) = grUtils.decompress(data)
sstruct.unpack2(Glat_format_3, data, self)
self.hasOctaboxes = (self.compression & 1) == 1
decoder = self.decompileAttributes3
gloc = ttFont['Gloc']
self.attributes = {}
count = 0
for s,e in zip(gloc,gloc[1:]):
self.attributes[ttFont.getGlyphName(count)] = decoder(data[s:e])
count += 1
def compile(self, ttFont):
data = sstruct.pack(Glat_format_0, self)
if self.version <= 1.9:
encoder = partial(self.compileAttributes12, fmt=Glat_format_1_entry)
elif self.version <= 2.9:
encoder = partial(self.compileAttributes12, fmt=Glat_format_1_entry)
elif self.version >= 3.0:
self.compression = (self.scheme << 27) + (1 if self.hasOctaboxes else 0)
data = sstruct.pack(Glat_format_3, self)
encoder = self.compileAttributes3
glocs = []
for n in range(len(self.attributes)):
glocs.append(len(data))
data += encoder(self.attributes[ttFont.getGlyphName(n)])
glocs.append(len(data))
ttFont['Gloc'].set(glocs)
if self.version >= 3.0:
data = grUtils.compress(self.scheme, data)
return data
def do_alto_post(self, endpoint, data, callback):
"""ALTO post to the given endpoint with given data"""
# Make HTTP POST to ALTO
url = self._alto_url + endpoint
try:
alto_resp_future = self._loop.run_in_executor(None, functools.partial(
requests.post, url, json=data))
alto_resp = yield from alto_resp_future
except OSError as exc:
logging.info('Consumed OSError while connecting to ALTO server')
return
# Process peers
ranked_peers = self._process_alto_response(alto_resp)
# Return results to swarm
callback(ranked_peers)
def doctable(ctx):
df = pd.read_csv('./docs/flight-options.csv')
# open an existing document
doc = docx.Document('./docs/style-reference.docx')
as_int = partial(format_decimal, format='#')
as_usd = partial(format_currency, currency='USD')
s = doc.sections[0]
width = s.page_width - s.left_margin - s.right_margin
doc.add_picture('./docs/diagrams_002.png', width=width)
formatters = {
'ticket_price': as_usd,
'total_hours': as_int,
'trip': as_int,
'airline': partial(shorten_long_name, width=20),
'selected': compose({0: 'No', 1: 'Yes'}.get, int)
}
add_table(df, doc, table_style='Plain Table 3', formatters=formatters)
# save the doc
doc.save('./docs/test.docx')
def fix_tickets(
self, ticket_frame: pd.DataFrame, path_fixes) -> pd.DataFrame:
ticket_frame.rename(
columns={'Total changed lines': 'ChangedLines'}, inplace=True)
ticket_frame = ticket_frame[
ticket_frame.ChangedLines < 100000]
ticket_frame = ticket_frame.assign(
ChangedFiles=ticket_frame['Changed files'].apply(
partial(self.fix_path_prefixes, path_fixes)))
fixed_frame = ticket_frame.drop(
'Changed files', axis=1).sort_values(
by='CommitDate').reset_index(drop=True)
fixed_frame.fillna(value={'Found': ''}, axis=0, inplace=True)
return fixed_frame
# prj1 specific methods
def getter(self, proxy_into = None, no_idmap = False):
schema = self.schema
proxy_class = self.proxy_class
index = self.index
idmap = self.idmap if not no_idmap else None
buf = self.buf
if proxy_class is not None:
proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
else:
proxy_class_new = None
@cython.locals(pos=int)
def getter(pos):
return schema.unpack_from(buf, index[pos], idmap, proxy_class_new, proxy_into)
return getter
def iter_fast(self):
# getter inlined
schema = self.schema
proxy_class = self.proxy_class
index = self.index
idmap = self.idmap
buf = self.buf
if proxy_class is not None:
proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
else:
proxy_class_new = None
proxy_into = schema.Proxy()
for i in xrange(len(self)):
yield schema.unpack_from(buf, index[i], idmap, proxy_class_new, proxy_into)
def ext_pillar(minion_id, pillar, *args, **kwargs):
import salt.utils
stack = {}
stack_config_files = list(args)
traverse = {
'pillar': partial(salt.utils.traverse_dict_and_list, pillar),
'grains': partial(salt.utils.traverse_dict_and_list, __grains__),
'opts': partial(salt.utils.traverse_dict_and_list, __opts__),
}
for matcher, matchs in kwargs.iteritems():
t, matcher = matcher.split(':', 1)
if t not in traverse:
raise Exception('Unknown traverse option "{0}", '
'should be one of {1}'.format(t, traverse.keys()))
cfgs = matchs.get(traverse[t](matcher, None), [])
if not isinstance(cfgs, list):
cfgs = [cfgs]
stack_config_files += cfgs
for cfg in stack_config_files:
if not os.path.isfile(cfg):
log.warning('Ignoring pillar stack cfg "{0}": '
'file does not exist'.format(cfg))
continue
stack = _process_stack_cfg(cfg, stack, minion_id, pillar)
return stack
def test_single_connection(self):
"""
Test a single connection with sequential requests.
"""
conn = self.get_connection()
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
event = Event()
def cb(count, *args, **kwargs):
count += 1
if count >= 10:
conn.close()
event.set()
else:
conn.send_msg(
QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
request_id=0,
cb=partial(cb, count))
conn.send_msg(
QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
request_id=0,
cb=partial(cb, 0))
event.wait()
def test_single_connection_pipelined_requests(self):
"""
Test a single connection with pipelined requests.
"""
conn = self.get_connection()
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
responses = [False] * 100
event = Event()
def cb(response_list, request_num, *args, **kwargs):
response_list[request_num] = True
if all(response_list):
conn.close()
event.set()
for i in range(100):
conn.send_msg(
QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
request_id=i,
cb=partial(cb, responses, i))
event.wait()
def _set_final_result(self, response):
self._cancel_timer()
if self._metrics is not None:
self._metrics.request_timer.addValue(time.time() - self._start_time)
with self._callback_lock:
self._final_result = response
# save off current callbacks inside lock for execution outside it
# -- prevents case where _final_result is set, then a callback is
# added and executed on the spot, then executed again as a
# registered callback
to_call = tuple(
partial(fn, response, *args, **kwargs)
for (fn, args, kwargs) in self._callbacks
)
self._event.set()
# apply each callback
for callback_partial in to_call:
callback_partial()
def test_create_action_plan(self):
_, goal = self.client.show_goal("dummy")
_, audit_template = self.create_audit_template(goal['uuid'])
_, audit = self.create_audit(audit_template['uuid'])
self.assertTrue(test_utils.call_until_true(
func=functools.partial(self.has_audit_finished, audit['uuid']),
duration=30,
sleep_for=.5
))
_, action_plans = self.client.list_action_plans(
audit_uuid=audit['uuid'])
action_plan = action_plans['action_plans'][0]
_, action_plan = self.client.show_action_plan(action_plan['uuid'])
self.assertEqual(audit['uuid'], action_plan['audit_uuid'])
self.assertEqual('RECOMMENDED', action_plan['state'])
def test_delete_action_plan(self):
_, goal = self.client.show_goal("dummy")
_, audit_template = self.create_audit_template(goal['uuid'])
_, audit = self.create_audit(audit_template['uuid'])
self.assertTrue(test_utils.call_until_true(
func=functools.partial(self.has_audit_finished, audit['uuid']),
duration=30,
sleep_for=.5
))
_, action_plans = self.client.list_action_plans(
audit_uuid=audit['uuid'])
action_plan = action_plans['action_plans'][0]
_, action_plan = self.client.show_action_plan(action_plan['uuid'])
self.client.delete_action_plan(action_plan['uuid'])
self.assertRaises(exceptions.NotFound, self.client.show_action_plan,
action_plan['uuid'])
def create_action_plan(cls, audit_template_uuid, **audit_kwargs):
"""Wrapper utility for creating a test action plan
:param audit_template_uuid: Audit template UUID to use
:param audit_kwargs: Dict of audit properties to set
:return: The action plan as dict
"""
_, audit = cls.create_audit(audit_template_uuid, **audit_kwargs)
audit_uuid = audit['uuid']
assert test_utils.call_until_true(
func=functools.partial(cls.has_audit_finished, audit_uuid),
duration=30,
sleep_for=.5
)
_, action_plans = cls.client.list_action_plans(audit_uuid=audit_uuid)
if len(action_plans['action_plans']) == 0:
return
return action_plans['action_plans'][0]
def _augment_module_post(net: nn.Module, callback_dict: dict) -> (dict, list):
backward_hook_remove_func_list = []
vis_param_dict = dict()
vis_param_dict['layer'] = None
vis_param_dict['index'] = None
vis_param_dict['method'] = GradType.NAIVE
for x, y in net.named_modules():
if not isinstance(y, nn.Sequential) and y is not net:
# I should add hook to all layers, in case they will be needed.
backward_hook_remove_func_list.append(
y.register_backward_hook(
partial(_backward_hook, module_name=x, callback_dict=callback_dict, vis_param_dict=vis_param_dict)))
def remove_handles():
for x in backward_hook_remove_func_list:
x.remove()
return vis_param_dict, remove_handles
def test_cascade(self):
# Register 2 functions and make sure the last registered
# function is executed first.
ret = pyrun(textwrap.dedent(
"""
import functools, os, imp
mod = imp.load_source("mod", r"{}")
def foo(s):
with open(r"{}", "ab") as f:
f.write(s)
mod.register_exit_fun(functools.partial(foo, b'1'))
mod.register_exit_fun(functools.partial(foo, b'2'))
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 0)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"21")
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=0,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self)
button.grid(column=x, row=y+1)
button['text'] = '?'
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def data_parallel(f, input, params, stats, mode, device_ids, output_device=None):
if output_device is None:
output_device = device_ids[0]
if len(device_ids) == 1:
return f(input, params, stats, mode)
def replicate(param_dict, g):
replicas = [{} for d in device_ids]
for k,v in param_dict.items():
for i,u in enumerate(g(v)):
replicas[i][k] = u
return replicas
params_replicas = replicate(params, lambda x: Broadcast(device_ids)(x))
stats_replicas = replicate(stats, lambda x: comm.broadcast(x, device_ids))
replicas = [partial(f, params=p, stats=s, mode=mode)
for p,s in zip(params_replicas, stats_replicas)]
inputs = scatter([input], device_ids)
outputs = parallel_apply(replicas, inputs)
return gather(outputs, output_device)
def __init__(self, *args, **kwargs):
super(GirderSession, self).__init__(*args, **kwargs)
self.wait_for_success = functools.partial(
self.wait_for,
predicate=lambda j:
j['status'] == JobStatus.SUCCESS,
on_timeout=lambda j:
'Timed out waiting for job/%s to move into success state' % j['_id'])
self.wait_for_error = functools.partial(
self.wait_for,
predicate=lambda j:
j['status'] == JobStatus.ERROR,
on_timeout=lambda j:
'Timed out waiting for job/%s to move into error state' % j['_id'])
self.wait_for_canceled = functools.partial(
self.wait_for,
predicate=lambda j:
j['status'] == JobStatus.CANCELED,
on_timeout=lambda j:
'Timed out waiting for job/%s to move into canceled state' % j['_id'])
def executemany(self, query, args):
"""Must be used with 'yield' as 'n = yield cursor.executemany(stmt)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.executemany, query, args))
def callproc(self, proc, args=()):
"""Must be used with 'yield' as 'yield cursor.callproc(proc)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.callproc, proc, args))
def execute(self, query, args=None):
"""Must be used with 'yield' as 'n = yield cursor.execute(stmt)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.execute, query, args))
def executemany(self, query, args):
"""Must be used with 'yield' as 'n = yield cursor.executemany(stmt)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.executemany, query, args))
def callproc(self, proc, args=()):
"""Must be used with 'yield' as 'yield cursor.callproc(proc)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.callproc, proc, args))
def lazycache(filename, module_globals):
"""Seed the cache for filename with module_globals.
The module loader will be asked for the source only when getlines is
called, not immediately.
If there is an entry in the cache already, it is not altered.
:return: True if a lazy load is registered in the cache,
otherwise False. To register such a load a module loader with a
get_source method must be found, the filename must be a cachable
filename, and the filename must not be already cached.
"""
if filename in cache:
if len(cache[filename]) == 1:
return True
else:
return False
if not filename or (filename.startswith('<') and filename.endswith('>')):
return False
# Try for a __loader__, if available
if module_globals and '__loader__' in module_globals:
name = module_globals.get('__name__')
loader = module_globals['__loader__']
get_source = getattr(loader, 'get_source', None)
if name and get_source:
get_lines = functools.partial(get_source, name)
cache[filename] = (get_lines,)
return True
return False
def parsers(self):
"""Metadata item name to parser function mapping."""
parse_list = self._parse_list
parse_list_semicolon = partial(self._parse_list, separator=';')
parse_bool = self._parse_bool
parse_dict = self._parse_dict
return {
'zip_safe': parse_bool,
'use_2to3': parse_bool,
'include_package_data': parse_bool,
'package_dir': parse_dict,
'use_2to3_fixers': parse_list,
'use_2to3_exclude_fixers': parse_list,
'convert_2to3_doctests': parse_list,
'scripts': parse_list,
'eager_resources': parse_list,
'dependency_links': parse_list,
'namespace_packages': parse_list,
'install_requires': parse_list_semicolon,
'setup_requires': parse_list_semicolon,
'tests_require': parse_list_semicolon,
'packages': self._parse_packages,
'entry_points': self._parse_file,
'py_modules': parse_list,
}
def parse_section_extras_require(self, section_options):
"""Parses `extras_require` configuration file section.
:param dict section_options:
"""
parse_list = partial(self._parse_list, separator=';')
self['extras_require'] = self._parse_section_to_dict(
section_options, parse_list)