我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.Value()。
def add_export(self,export_range,export_dir): logger.debug("Adding new video export process.") should_terminate = mp.Value(c_bool,False) frames_to_export = mp.Value(c_int,0) current_frame = mp.Value(c_int,0) rec_dir = self.g_pool.rec_dir user_dir = self.g_pool.user_dir start_frame= export_range.start end_frame= export_range.stop+1 #end_frame is exclusive frames_to_export.value = end_frame-start_frame # Here we make clones of every plugin that supports it. # So it runs in the current config when we lauch the exporter. plugins = self.g_pool.plugins.get_initializers() out_file_path=verify_out_file_path(self.rec_name,export_dir) process = Export_Process(target=export, args=(should_terminate,frames_to_export,current_frame, rec_dir,user_dir,self.g_pool.min_data_confidence,start_frame,end_frame,plugins,out_file_path)) self.new_export = process
def start(self): self.setup_sockets() import StaticUPnP_Settings permissions = Namespace(**StaticUPnP_Settings.permissions) print(permissions) if permissions.drop_permissions: self.drop_privileges(permissions.user, permissions.group) self.running = Value(ctypes.c_int, 1) self.queue = Queue() self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running)) self.reciever_thread.start() self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,)) self.schedule_thread.start() self.response_thread = Process(target=self.response_handler, args=(self.queue, self.running)) self.response_thread.start()
def share(self): shared = super().share() if hasattr(self, 'lastYs'): # share lastYs to communicate between batch_act and observe shared['lastYs'] = self.lastYs if self.opt.get('numthreads', 1) > 1: if type(self.index) is not multiprocessing.sharedctypes.Synchronized: # for multithreading need to move index into threadsafe memory self.index = Value('l', -1) if hasattr(self, 'sorted_data'): shared['sorted_data'] = self.sorted_data shared['batches'] = self.batches else: shared['data_loader'] = self.data_loader shared['index'] = self.index return shared
def __init__(self, n_runs, eval_interval, outdir, max_episode_len=None, explorer=None, step_offset=0, logger=None): self.start_time = time.time() self.n_runs = n_runs self.eval_interval = eval_interval self.outdir = outdir self.max_episode_len = max_episode_len self.explorer = explorer self.step_offset = step_offset self.logger = logger or logging.getLogger(__name__) # Values below are shared among processes self.prev_eval_t = mp.Value( 'l', self.step_offset - self.step_offset % self.eval_interval) self._max_score = mp.Value('f', np.finfo(np.float32).min) self.wrote_header = mp.Value('b', False) # Create scores.txt with open(os.path.join(self.outdir, 'scores.txt'), 'a'): pass
def __init__(self, runner_class, path): logger.info('Runner UI init') urwid.set_encoding("UTF-8") self.runner_class = runner_class self.path = path self.store = Store(self) self.main_loop = None self.w_main = None self._first_failed_focused = False # process comm self.child_pipe = None self.pipe_size = multiprocessing.Value('i', 0) self.pipe_semaphore = multiprocessing.Event() self.receive_buffer = '' self.runner_process = None self.init_main_screen()
def _parse_http_proxy(envVarNames): """ Parses the value of the first existing environment variable named in `envVarNames` into a host and port tuple where port is None if it's not present in the environment variable. """ p = re.compile(r'(?:https?://)?([^:]+):?(\d+)?/?$') for name in envVarNames: value = get_env(name) if value: m = p.match(value) if m: return m.group(1), m.group(2) else: abort("Value of " + name + " is not valid: " + value) return (None, None)
def write(client, data, dst, write_type, timer): """Write the {src} file in the local filesystem to the {dst} file in Alluxio. Args: client (:class:`alluxio.Client`): Alluxio client. data (str): The file content of the source. dst (str): The file to be written to Alluxio. write_type (:class:`alluxio.wire.WriteType`): Write type for creating the file. timer (:class:`multiprocessing.Value`): Timer for summing up the total time for writing the files. Returns: float: writing time """ start_time = time.time() with client.open(dst, 'w', recursive=True, write_type=write_type) as alluxio_file: alluxio_file.write(data) elapsed_time = time.time() - start_time with timer.get_lock(): timer.value += elapsed_time return elapsed_time
def main(args): with open(args.src, 'r') as f: data = f.read() timer = Value('d', 0) processes = [] for process_id in range(args.nprocess): p = Process(target=run_write, args=(args, data, process_id, timer)) processes.append(p) start_time = time.time() for p in processes: p.start() for p in processes: p.join() average_time_per_process = timer.value / len(processes) print_stats(args, average_time_per_process)
def main(args): with open(args.expected, 'r') as f: expected = f.read() timer = Value('d', 0) processes = [] for process_id in range(args.nprocess): p = Process(target=run_read, args=(args, expected, process_id, timer)) processes.append(p) start_time = time.time() for p in processes: p.start() for p in processes: p.join() average_time_per_process = timer.value / len(processes) print_stats(args, average_time_per_process)
def test_kill_process_tree(self): """Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway.""" child_process_killed = multiprocessing.Value('i', 0) process_done = multiprocessing.Semaphore(0) child_pid = multiprocessing.Value('i', 0) setup_done = multiprocessing.Semaphore(0) args = [child_process_killed, child_pid, process_done, setup_done] child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args) try: child.start() self.assertTrue(process_done.acquire(timeout=5.0)) self.assertEqual(1, child_process_killed.value) finally: try: os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here except OSError: pass
def __init__(self, platform_index, device_index, ip, port): Process.__init__(self) Logger.__init__(self) # self.logger_level ^= Logger.MSG_VERBOSE self.daemon = True self.exit_evt = Event() self.running = Value('i', 0) self.platform_index = platform_index self.device_index = device_index self.ip = ip self.port = port self.uuid = uuid.uuid1().hex self.ocl_ga = None ## Terminate worker process, this should be only called when OpenCLGAClient # is shutting down. The exti_evt will be set to break the wait in the # process's run.
def __init__(self, name, outbox, max_task): ''' @name: ???, ???????????, @outbox: ???????? url ???? @max_task: ????????? (????????? coroutine ??) ''' multiprocessing.Process.__init__(self) self.name = name self.inbox = multiprocessing.Queue() # ??????????? url self.outbox = outbox self.max_task = max_task self.doing = multiprocessing.Value('i', 0) self._doing = set() self.result = set() # ?????? url self.loop = None
def get_balance(self, failover_time=None): balance = 0 if self.is_registered(): # get confirmed balance balance = float(self.get_balance_confirmed()) # get unconfirmed balance come of bot balance += float(crypto.get_user_spendable_balance(self.address)) if failover_time is not None and type(failover_time) is type(multiprocessing.Value): # if we call function without failover_time, we consider we are in safe mode if int(time.time()) > int(failover_time.value) + 86400: # not in safe mode so add unconfirmed balance balance += float(self.get_balance_unconfirmed()) return balance
def simThread(queues_arr, pipes_arr, holdValue_v, objectArray_arr = None, mainLock = None): def termThread(queues_arr, pipes_arr, holdValue_v, objectArray_arr = None, mainLock = None): commandPipe = pipes_arr[0] controlQueue_q = queues_arr[0] pullString_q = multiprocessing.Queue() pushString_q = multiprocessing.Queue() termThreadHold_v = multiprocessing.Value() guiHold_v = multiprocessing.Value() guiHold_v.value = False termThreadHold_v.value = False subProcess = multiprocessing.Process(target = terminal, args = (0, pullString_q, pushString_q, guiHold_v, termThreadHold_v)) subProcess.start() checkSequence_bool = True while checkSequence_bool: termThreadEventHandler(termThreadHold_v, pullString_q, commandPipe, holdValue_v) termThreadControlHandler(termThreadHold_v, controlQueue_q, pushString_q, guiHold_v)
def test_ipv4_async(): global FLAG FLAG = Value('i', 0) nma = nmap.PortScannerAsync() def callback_result(host, scan_result): global FLAG FLAG.value = 1 nma.scan(hosts='127.0.0.1', arguments='-p 22 -Pn', callback=callback_result) while nma.still_scanning(): nma.wait(2) assert_equals(FLAG.value, 1)
def test_ipv6_async(): global FLAG FLAG = Value('i', 0) nma = nmap.PortScannerAsync() def callback_result(host, scan_result): global FLAG FLAG.value = 1 nma.scan(hosts='::1', arguments='-6 -p 22 -Pn', callback=callback_result) while nma.still_scanning(): nma.wait(2) assert_equals(FLAG.value, 1)
def __init__(self, workdir, n_threads=0): self.n_threads = cpu_count() if n_threads == 0 else n_threads self.process_list = [] self.workdir = workdir self.work_queue = Queue() self.update_queues = [] self.testcase_report = Queue() self.coverage = {} self.testcases = [] self.executed_testcases = Value('i', 0) self.mutator = mutator() if not os.path.exists(workdir): os.makedirs(workdir) os.chdir(workdir)
def test_waitfor(self): # based on test in test/lock_tests.py cond = self.Condition() state = self.Value('i', -1) p = self.Process(target=self._test_waitfor_f, args=(cond, state)) p.daemon = True p.start() with cond: result = cond.wait_for(lambda : state.value==0) self.assertTrue(result) self.assertEqual(state.value, 0) for i in range(4): time.sleep(0.01) with cond: state.value += 1 cond.notify() p.join(5) self.assertFalse(p.is_alive()) self.assertEqual(p.exitcode, 0)
def test_waitfor_timeout(self): # based on test in test/lock_tests.py cond = self.Condition() state = self.Value('i', 0) success = self.Value('i', False) sem = self.Semaphore(0) p = self.Process(target=self._test_waitfor_timeout_f, args=(cond, state, success, sem)) p.daemon = True p.start() self.assertTrue(sem.acquire(timeout=10)) # Only increment 3 times, so state == 4 is never reached. for i in range(3): time.sleep(0.01) with cond: state.value += 1 cond.notify() p.join(5) self.assertTrue(success.value)
def test_value(self, raw=False): if raw: values = [self.RawValue(code, value) for code, value, _ in self.codes_values] else: values = [self.Value(code, value) for code, value, _ in self.codes_values] for sv, cv in zip(values, self.codes_values): self.assertEqual(sv.value, cv[1]) proc = self.Process(target=self._test, args=(values,)) proc.daemon = True proc.start() proc.join() for sv, cv in zip(values, self.codes_values): self.assertEqual(sv.value, cv[2])
def test_getobj_getlock(self): val1 = self.Value('i', 5) lock1 = val1.get_lock() obj1 = val1.get_obj() val2 = self.Value('i', 5, lock=None) lock2 = val2.get_lock() obj2 = val2.get_obj() lock = self.Lock() val3 = self.Value('i', 5, lock=lock) lock3 = val3.get_lock() obj3 = val3.get_obj() self.assertEqual(lock, lock3) arr4 = self.Value('i', 5, lock=False) self.assertFalse(hasattr(arr4, 'get_lock')) self.assertFalse(hasattr(arr4, 'get_obj')) self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') arr5 = self.RawValue('i', 5) self.assertFalse(hasattr(arr5, 'get_lock')) self.assertFalse(hasattr(arr5, 'get_obj'))
def test_sharedctypes(self, lock=False): x = Value('i', 7, lock=lock) y = Value(c_double, 1.0/3.0, lock=lock) foo = Value(_Foo, 3, 2, lock=lock) arr = self.Array('d', list(range(10)), lock=lock) string = self.Array('c', 20, lock=lock) string.value = latin('hello') p = self.Process(target=self._double, args=(x, y, foo, arr, string)) p.daemon = True p.start() p.join() self.assertEqual(x.value, 14) self.assertAlmostEqual(y.value, 2.0/3.0) self.assertEqual(foo.x, 6) self.assertAlmostEqual(foo.y, 4.0) for i in range(10): self.assertAlmostEqual(arr[i], i*2) self.assertEqual(string.value, latin('hellohello'))
def __init__(self, path, lock, in_path=None): """ Setup all values to be shared (between processes) values. """ self.lock = lock self.path = path if os.path.isfile(path): self.loadData() else: self.in_path = in_path self.clones = Manager().list() self.counter = Value("i", 0) self.nodes_total = Value("i", 0) self.first_counter = Value("i", 0) self.query_time_total = Value("d", 0) self.projects_counter = Value("i", 0) self.first_query_time_total = Value("d", 0)
def test_ipv6_async(): global FLAG_ipv6 FLAG_ipv6 = Value('i', 0) nma_ipv6 = nmap.PortScannerAsync() def callback_result(host, scan_result): global FLAG_ipv6 FLAG_ipv6.value = 1 nma_ipv6.scan(hosts='::1', arguments='-6 -p 22 -Pn', callback=callback_result) while nma_ipv6.still_scanning(): nma_ipv6.wait(2) assert_equals(FLAG_ipv6.value, 1)
def work(sample, simulate, accept, queue, n_eval: Value, n_particles: Value): random.seed() np.random.seed() while n_particles.value > 0: with n_eval.get_lock(): particle_id = n_eval.value n_eval.value += 1 new_param = sample() new_sim = simulate(new_param) if accept(new_sim): with n_particles.get_lock(): n_particles.value -= 1 queue.put((particle_id, new_sim)) queue.put(DONE)
def __init__(self, messages, modulators, num_repeats=-1): """ :type messages: list of Message :type modulators: list of Modulator """ self.messages = messages self.modulators = modulators self.num_repeats = num_repeats # -1 or 0 = infinite self.ring_buffer = RingBuffer(int(self.BUFFER_SIZE_MB*10**6)//8) self.current_message_index = Value("L", 0) self.abort = Value("i", 0) self.process = Process(target=self.modulate_continuously, args=(self.num_repeats, )) self.process.daemon = True
def __init__(self, id, prediction_q, training_q, episode_log_q): super(Agent, self).__init__(name="Agent_{}".format(id)) self.id = id self.prediction_q = prediction_q self.training_q = training_q self.episode_log_q = episode_log_q gym_env = gym.make(FLAGS.game) gym_env.seed(FLAGS.seed) self.env = AtariEnvironment(gym_env=gym_env, resized_width=FLAGS.resized_width, resized_height=FLAGS.resized_height, agent_history_length=FLAGS.agent_history_length) self.nb_actions = len(self.env.gym_actions) self.wait_q = Queue(maxsize=1) self.stop = Value('i', 0)
def __init__(self, account, password, notifier, ocr_service, debug_single_step=False): self.__account = account self.__password = password self.__notifier = notifier self.__ocr_service = ocr_service self.__manager = Manager() self.__job_list = self.__manager.list() self.__job_list_lock = Lock() self.__map = self.__manager.dict() self.__entrust_map = self.__manager.dict() self.__process = None self.__keep_working = Value('i', 1) if debug_single_step: self.__debug_single_step = Value('i', 1) else: self.__debug_single_step = Value('i', 0) self.__debug_single_step_go = Value('i', 0) self.__debug_single_step_lock = Lock()
def __init__(self, id, prediction_q, training_q, episode_log_q): super(ProcessAgent, self).__init__() self.id = id self.prediction_q = prediction_q self.training_q = training_q self.episode_log_q = episode_log_q self.env = Environment() self.num_actions = self.env.get_num_actions() self.actions = np.arange(self.num_actions) self.discount_factor = Config.DISCOUNT # one frame at a time self.wait_q = Queue(maxsize=1) self.exit_flag = Value('i', 0)
def start(self, config: dict): """Start the arbiter worker process""" with self.lock: if self.arbiter and not self.arbiter_stop_flag.value: msg = 'Failed to start. Arbiter process already running.' raise ServiceException(msg) if self.arbiter_process and self.arbiter_process.is_alive(): LOG.info('Arbiter process is still alive. Terminating...') self.arbiter_process.terminate() self.arbiter_stop_flag = Value('b', False) self.config = Configuration().update(config) self.arbiter = Arbiter(self.config) self.arbiter_process = Process(target=self._loop, args=(self.arbiter, self.arbiter_stop_flag)) self.arbiter_process.daemon = True self.arbiter_process.start() self.start_time = time.time() LOG.info('Arbiter process was started') return True
def __init__(self, num_processes=1, tasks_per_requests=1, bitmap_size=(64 << 10)): self.to_update_queue = multiprocessing.Queue() self.to_master_queue = multiprocessing.Queue() self.to_master_from_mapserver_queue = multiprocessing.Queue() self.to_master_from_slave_queue = multiprocessing.Queue() self.to_mapserver_queue = multiprocessing.Queue() self.to_slave_queues = [] for i in range(num_processes): self.to_slave_queues.append(multiprocessing.Queue()) self.slave_locks_A = [] self.slave_locks_B = [] for i in range(num_processes): self.slave_locks_A.append(multiprocessing.Lock()) self.slave_locks_B.append(multiprocessing.Lock()) self.slave_locks_B[i].acquire() self.reload_semaphore = multiprocessing.Semaphore(multiprocessing.cpu_count()/2) self.num_processes = num_processes self.tasks_per_requests = tasks_per_requests self.stage_abortion_notifier = multiprocessing.Value('b', False) self.slave_termination = multiprocessing.Value('b', False, lock=False) self.sampling_failed_notifier = multiprocessing.Value('b', False) self.effector_mode = multiprocessing.Value('b', False) self.files = ["/dev/shm/kafl_fuzzer_master_", "/dev/shm/kafl_fuzzer_mapserver_", "/dev/shm/kafl_fuzzer_bitmap_"] self.sizes = [(65 << 10), (65 << 10), bitmap_size] self.tmp_shm = [{}, {}, {}]
def __init__(self): self._playlist = [] self._counter = Value('i', 0)
def __init__(self, poseNet, config, di, comrefNet=None): """ Initialize data :param poseNet: network for pose estimation :param config: configuration :param di: depth importer :param comrefNet: refinement network from center of mass detection :return: None """ # handpose CNN self.importer = di self.poseNet = poseNet self.comrefNet = comrefNet # configuration self.config = config self.initialconfig = config # synchronization between threads self.queue = Queue() self.stop = Value('b', False) # for calculating FPS self.lastshow = time.time() # hand left/right self.hand = self.HAND_LEFT # initial state self.state = self.STATE_IDLE # hand size estimation self.handsizes = [] self.numinitframes = 50 # hand tracking or detection self.tracking = False self.lastcom = (0, 0, 0) # Force network to compile output in the beginning self.poseNet.computeOutput(numpy.zeros(self.poseNet.cfgParams.inputDim, dtype='float32')) if self.comrefNet is not None: self.comrefNet.computeOutput([numpy.zeros(sz, dtype='float32') for sz in self.comrefNet.cfgParams.inputDim])
def start(self): import StaticUPnP_Settings permissions = Namespace(**StaticUPnP_Settings.permissions) print(permissions) if permissions.drop_permissions: self.drop_privileges(permissions.user, permissions.group) self.setup_sockets() self.running = Value(ctypes.c_int, 1) self.queue = Queue() self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running)) self.reciever_thread.start() self.runner_thread = Process(target=self.run, args=(self.queue, self.running)) self.runner_thread.start()
def init_marker_cacher(self): from marker_detector_cacher import fill_cache visited_list = [False if x == False else True for x in self.cache] video_file_path = self.g_pool.capture.source_path timestamps = self.g_pool.capture.timestamps self.cache_queue = mp.Queue() self.cacher_seek_idx = mp.Value('i',0) self.cacher_run = mp.Value(c_bool,True) self.cacher = mp.Process(target=fill_cache, args=(visited_list,video_file_path,timestamps,self.cache_queue,self.cacher_seek_idx,self.cacher_run,self.min_marker_perimeter_cacher)) self.cacher.start()
def __init__(self, opt, world): super().__init__(opt) self.inner_world = world self.numthreads = opt['numthreads'] self.sync = { # syncronization primitives # semaphores for counting queued examples 'queued_sem': Semaphore(0), # counts num exs to be processed 'threads_sem': Semaphore(0), # counts threads 'reset_sem': Semaphore(0), # allows threads to reset # flags for communicating with threads 'reset_flag': Value('b', False), # threads should reset 'term_flag': Value('b', False), # threads should terminate # counters 'epoch_done_ctr': Value('i', 0), # number of done threads 'total_parleys': Value('l', 0), # number of parleys in threads } # don't let threads create more threads! self.threads = [] for i in range(self.numthreads): self.threads.append(HogwildProcess(i, opt, world, self.sync)) for t in self.threads: t.start() for _ in self.threads: self.sync['threads_sem'].acquire()
def main(): env = Env() play_pipe, predict_pipe = Pipe() train_pipe1, train_pipe2 = Pipe() is_training = Value("b", True) manager = DQNManager(env.state_n, env.action_n, train_pipe1, predict_pipe, is_training) controller = AIControl(env, train_pipe2, play_pipe, is_training) manager.start() controller.control_start() manager.join()
def __init__(self, hosts): self._task_ctx = None self._item_ctx = None # ansible doesn't have callback to tell us # there is no host matched, so we do that # check by ourself using this list. self._hosts = copy.deepcopy(hosts) # ansible run jobs after `fork()` workers, # shm will make those control vars synced. self._reaper_queue = multiprocessing.Queue() self._playbook_mode = multiprocessing.Value('I', 0)
def __init__(self, # type: ignore # (mypy doesn't like multiprocessing lib) options: seproxer.mitmproxy_extensions.options, server: mitmproxy.proxy.server, results_queue: multiprocessing.Queue, push_event: multiprocessing.Event, active_flows_state: multiprocessing.Value, ) -> None: """ :param options: The extended mitmproxy options, used to configure our addons :param server: The mitmproxy server that the proxy will be interfacing with :param results_queue: The mitmproxy flows will be pushed into this queue :param push_event: When this event is set, the stored flows will be pushed into the `results_queue` :param active_flows_state: A shared state that determines if there are any active flows, that is, if any requests have pending responses """ super().__init__(options, server) # This addon will allow us to modify headers, this is particularly useful for appending # authentication cookies since selenium_extensions cannot modify HTTP ONLY cookies self.addons.add(mitmproxy.addons.setheaders.SetHeaders()) # This add-on hooks into javascript window.onerror and all the console logging # methods to log message into our defined "window.__seproxer_logs" object self.addons.add(mitmproxy_extensions.addons.JSConsoleErrorInjection()) # This addon will be responsible for storing our requests / responses in memory # and will allow us to push the results through out results_queue self._memory_stream_addon = mitmproxy_extensions.addons.MemoryStream() self.addons.add(self._memory_stream_addon) self.results_queue = results_queue self.push_event = push_event self.active_flows_state = active_flows_state
def __init__(self, mitmproxy_options: mitmproxy_extensions.options.MitmproxyExtendedOptions) -> None: self.mitmproxy_options = mitmproxy_options # setup proxy server from options proxy_config = mitmproxy.proxy.config.ProxyConfig(mitmproxy_options) self._proxy_server = mitmproxy.proxy.server.ProxyServer(proxy_config) self._results_queue = multiprocessing.Queue() self._producer_push_event = multiprocessing.Event() # type: ignore self._has_active_flows_state = multiprocessing.Value(ctypes.c_bool, False) self._proxy_proc = None # type: t.Optional[ProxyProc]
def __init__(self, q_function, optimizer, t_max, gamma, i_target, explorer, phi=lambda x: x, average_q_decay=0.999, logger=getLogger(__name__), batch_states=batch_states): self.shared_q_function = q_function self.target_q_function = copy.deepcopy(q_function) self.q_function = copy.deepcopy(self.shared_q_function) async.assert_params_not_shared(self.shared_q_function, self.q_function) self.optimizer = optimizer self.t_max = t_max self.gamma = gamma self.explorer = explorer self.i_target = i_target self.phi = phi self.logger = logger self.average_q_decay = average_q_decay self.batch_states = batch_states self.t_global = mp.Value('l', 0) self.t = 0 self.t_start = 0 self.past_action_values = {} self.past_states = {} self.past_rewards = {} self.average_q = 0
def test_run_async(self): counter = mp.Value('l', 0) def run_func(process_idx): for _ in range(1000): with counter.get_lock(): counter.value += 1 async.run_async(4, run_func) self.assertEqual(counter.value, 4000)
def initSharedMemoryState(self): self._builtBox = multiprocessing.Value('b', 1 if self.built else 0)
def pquery(self, x_list, k=1, eps=0, p=2, distance_upper_bound=np.inf): x = np.array(x_list) nx, mx = x.shape shmem_x = mp.Array(ctypes.c_double, nx * mx) shmem_d = mp.Array(ctypes.c_double, nx * k) shmem_i = mp.Array(ctypes.c_double, nx * k) _x = shmem_as_nparray(shmem_x).reshape((nx, mx)) _d = shmem_as_nparray(shmem_d).reshape((nx, k)) _i = shmem_as_nparray(shmem_i) if k != 1: _i = _i.reshape((nx, k)) _x[:, :] = x nprocs = num_cpus() scheduler = Scheduler(nx, nprocs) ierr = mp.Value(ctypes.c_int, 0) query_args = (scheduler, self.shmem_data, self.n, self.m, self.leafsize, shmem_x, nx, shmem_d, shmem_i, k, eps, p, distance_upper_bound, ierr ) pool = [mp.Process(target=_pquery, args=query_args) for n in range(nprocs)] for p in pool: p.start() for p in pool: p.join() if ierr.value != 0: raise RuntimeError('%d errors in worker processes' % (ierr.value)) return _d.copy(), _i.astype(int).copy()
def test_server_singleproc(restore_signal): started = mp.Value('i', 0) terminated = mp.Value('i', 0) def interrupt(): os.kill(0, signal.SIGINT) @aiotools.actxmgr async def myserver(loop, proc_idx, args): nonlocal started, terminated assert proc_idx == 0 assert len(args) == 0 await asyncio.sleep(0) with started.get_lock(): started.value += 1 loop.call_later(0.2, interrupt) yield await asyncio.sleep(0) with terminated.get_lock(): terminated.value += 1 aiotools.start_server(myserver) assert started.value == 1 assert terminated.value == 1