Python multiprocessing 模块,set_start_method() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用multiprocessing.set_start_method()

项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def __init__(
        self,
        worker_class,
        concurrent_workers,
    ):
        self.logger = logger.logger.Logger(
            logger_name='Supervisor',
        )

        self.worker_class = worker_class
        self.concurrent_workers = concurrent_workers

        self.task = self.worker_class()

        self.workers_processes = []

        self.should_work_event = threading.Event()
        self.should_work_event.set()

        multiprocessing.set_start_method(
            method='spawn',
            force=True,
        )
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_set_get(self):
        multiprocessing.set_forkserver_preload(PRELOAD)
        count = 0
        old_method = multiprocessing.get_start_method()
        try:
            for method in ('fork', 'spawn', 'forkserver'):
                try:
                    multiprocessing.set_start_method(method, force=True)
                except ValueError:
                    continue
                self.assertEqual(multiprocessing.get_start_method(), method)
                ctx = multiprocessing.get_context()
                self.assertEqual(ctx.get_start_method(), method)
                self.assertTrue(type(ctx).__name__.lower().startswith(method))
                self.assertTrue(
                    ctx.Process.__name__.lower().startswith(method))
                self.check_context(multiprocessing)
                count += 1
        finally:
            multiprocessing.set_start_method(old_method, force=True)
        self.assertGreaterEqual(count, 1)
项目:mobai    作者:eguven    | 项目源码 | 文件源码
def match_game(self):
        multiprocessing.set_start_method('spawn')
        # TODO restart runners for running games?
        logger.info('Matchmaker started')
        while True:
            wait = gen.sleep(5)
            starttime = ioloop.IOLoop.current().time()
            players = yield gamequeue.find().sort([('_id', 1)]).limit(10).to_list(length=10)
            while len(players) >= 2:
                random.shuffle(players)
                p0, p1, players = players[0], players[1], players[2:]
                p0['token'], p1['token'] = create_token(), create_token()
                queue_ids = [p0.pop('_id'), p1.pop('_id')]
                game = {'player0': p0, 'player1': p1, 'turn': 0, 'status': 'new'}
                insert_result = yield games.insert_one(game)
                game_idstr = str(insert_result.inserted_id)
                runner_name = 'runner-%s' % game_idstr
                logger.info('Launching Process "%s"', runner_name)
                p = multiprocessing.Process(target=Runner.start_game, args=(game_idstr,), name=runner_name, daemon=True)
                p.start()
                # TODO keep track of spawned runner processes
                yield gamequeue.delete_many({'_id': {'$in': queue_ids}})
            endtime = ioloop.IOLoop.current().time()
            logger.debug('MatchMaker ran for %.3fms', 1000 * (endtime - starttime))
            yield wait
项目:research-dist    作者:DrawML    | 项目源码 | 文件源码
def start(self, slave_addr, task):
        self._task = task

        def _start(id, slave_addr, task):
            from multiprocessing import Process
            import multiprocessing
            #multiprocessing.set_start_method('spawn')
            Process(target=_worker_main, args=(id, slave_addr, task)).start()

        from concurrent.futures import ProcessPoolExecutor
        print("[Worker {0}] Create".format(self.id))
        _start(self.id, slave_addr, task)
        #executor = ProcessPoolExecutor()
        #loop = asyncio.get_event_loop()
        #asyncio.ensure_future(loop.run_in_executor(ProcessPoolExecutor(), _worker_main, self.id, slave_addr, task))
        #asyncio.ensure_future(_start(self.id, slave_addr, task))
        #yield from asyncio.sleep(10)
        print("***")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_set_get(self):
        multiprocessing.set_forkserver_preload(PRELOAD)
        count = 0
        old_method = multiprocessing.get_start_method()
        try:
            for method in ('fork', 'spawn', 'forkserver'):
                try:
                    multiprocessing.set_start_method(method, force=True)
                except ValueError:
                    continue
                self.assertEqual(multiprocessing.get_start_method(), method)
                ctx = multiprocessing.get_context()
                self.assertEqual(ctx.get_start_method(), method)
                self.assertTrue(type(ctx).__name__.lower().startswith(method))
                self.assertTrue(
                    ctx.Process.__name__.lower().startswith(method))
                self.check_context(multiprocessing)
                count += 1
        finally:
            multiprocessing.set_start_method(old_method, force=True)
        self.assertGreaterEqual(count, 1)
项目:pyrealtime    作者:ewhitmire    | 项目源码 | 文件源码
def __init__(self):
            # multiprocessing.set_start_method('spawn')
            self.layers = {}
            self.stop_event = multiprocessing.get_context('spawn').Event()
            self.input_prompts = multiprocessing.get_context('spawn').Queue()
            self.show_monitor = False
项目:reversi_ai    作者:andysalerno    | 项目源码 | 文件源码
def run():

    # ask user for difficulty
    q_app = QtWidgets.QApplication([])
    q_widget = QtWidgets.QWidget()
    dialog = QtWidgets.QMessageBox(q_widget)
    dialog.addButton('Easy', QtWidgets.QMessageBox.ActionRole)
    dialog.addButton('Medium', QtWidgets.QMessageBox.ActionRole)
    dialog.addButton('Hard', QtWidgets.QMessageBox.ActionRole)
    dialog.addButton('Impossible', QtWidgets.QMessageBox.ActionRole)
    dialog.setText('Choose difficulty:')
    ret = dialog.exec_()

    easy, medium, hard, impossible = range(4)
    sim_time = None
    if ret == easy:
        sim_time = 1
    elif ret == medium:
        sim_time = 3
    elif ret == hard:
        sim_time = 5
    elif ret == impossible:
        sim_time = 8

    mp.set_start_method('spawn')
    gui_process = mp.Process(target=start_client.main)
    gui_process.start()

    run_game.main(BlackAgent='human', WhiteAgent='monte_carlo',
                  sim_time=sim_time, gui=True)
项目:MishMash    作者:nicfit    | 项目源码 | 文件源码
def main(args):
    import multiprocessing

    try:
        multiprocessing.set_start_method("fork")
    except RuntimeError as ex:
        log.warn("multiprocessing.set_start_method: " + str(ex))

    if not args.command:
        # No command was given.
        args.app.arg_parser.print_help()
        return 1

    # In the case fileConfig undid the command line, which has precedence.
    args.applyLoggingOpts(args.log_levels, args.log_files)

    if args.db_url:
        args.config.set(MAIN_SECT, SA_KEY, args.db_url)
        # Don't want commands and such to use this, so reset.
        args.db_url = None
    elif "MISHMASH_DBURL" in os.environ:
        log.verbose("Using environment MISHMASH_DBURL over configuration: {}"
                    .format(os.environ["MISHMASH_DBURL"]))
        args.config.set(MAIN_SECT, SA_KEY, os.environ["MISHMASH_DBURL"])

    try:
        # Run command
        retval = args.command_func(args, args.config) or 0
    except (KeyboardInterrupt, PromptExit) as ex:
        # PromptExit raised when CTRL+D during prompt, or prompts disabled
        retval = 0
    except (sql_exceptions.ArgumentError,
            sql_exceptions.OperationalError) as ex:
        _pErr("Database error")
        retval = 1
    except Exception as ex:
        log.exception(ex)
        _pErr("General error")
        retval = 2

    return retval
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_context(self):
        for method in ('fork', 'spawn', 'forkserver'):
            try:
                ctx = multiprocessing.get_context(method)
            except ValueError:
                continue
            self.assertEqual(ctx.get_start_method(), method)
            self.assertIs(ctx.get_context(), ctx)
            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
            self.assertRaises(ValueError, ctx.set_start_method, None)
            self.check_context(ctx)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_semaphore_tracker(self):
        import subprocess
        cmd = '''if 1:
            import multiprocessing as mp, time, os
            mp.set_start_method("spawn")
            lock1 = mp.Lock()
            lock2 = mp.Lock()
            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
            time.sleep(10)
        '''
        r, w = os.pipe()
        p = subprocess.Popen([sys.executable,
                             '-c', cmd % (w, w)],
                             pass_fds=[w],
                             stderr=subprocess.PIPE)
        os.close(w)
        with open(r, 'rb', closefd=True) as f:
            name1 = f.readline().rstrip().decode('ascii')
            name2 = f.readline().rstrip().decode('ascii')
        _multiprocessing.sem_unlink(name1)
        p.terminate()
        p.wait()
        time.sleep(2.0)
        with self.assertRaises(OSError) as ctx:
            _multiprocessing.sem_unlink(name2)
        # docs say it should be ENOENT, but OSX seems to give EINVAL
        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
        err = p.stderr.read().decode('utf-8')
        p.stderr.close()
        expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
        self.assertRegex(err, expected)
        self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)

#
# Mixins
#
项目:gymexperiments    作者:tambetm    | 项目源码 | 文件源码
def run(args):
    # create dummy environment to be able to create model
    env = gym.make(args.environment)
    assert isinstance(env.observation_space, Box)
    assert isinstance(env.action_space, Discrete)
    print("Observation space:", env.observation_space)
    print("Action space:", env.action_space)

    # create main model
    model = create_model(env, args)
    model.summary()
    env.close()

    # force runner processes to use cpu
    os.environ["CUDA_VISIBLE_DEVICES"] = ""
    # for better compatibility with Theano and Tensorflow
    multiprocessing.set_start_method('spawn')

    # create shared buffer for sharing weights
    blob = pickle.dumps(model.get_weights(), pickle.HIGHEST_PROTOCOL)
    shared_buffer = Array('c', len(blob))
    shared_buffer.raw = blob

    # create fifos and threads for all runners
    fifos = []
    for i in range(args.num_runners):
        fifo = Queue(args.queue_length)
        fifos.append(fifo)
        process = Process(target=runner, args=(shared_buffer, fifo, args))
        process.start()

    # start trainer in main thread
    trainer(model, fifos, shared_buffer, args)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_context(self):
        for method in ('fork', 'spawn', 'forkserver'):
            try:
                ctx = multiprocessing.get_context(method)
            except ValueError:
                continue
            self.assertEqual(ctx.get_start_method(), method)
            self.assertIs(ctx.get_context(), ctx)
            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
            self.assertRaises(ValueError, ctx.set_start_method, None)
            self.check_context(ctx)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_semaphore_tracker(self):
        import subprocess
        cmd = '''if 1:
            import multiprocessing as mp, time, os
            mp.set_start_method("spawn")
            lock1 = mp.Lock()
            lock2 = mp.Lock()
            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
            time.sleep(10)
        '''
        r, w = os.pipe()
        p = subprocess.Popen([sys.executable,
                             '-c', cmd % (w, w)],
                             pass_fds=[w],
                             stderr=subprocess.PIPE)
        os.close(w)
        with open(r, 'rb', closefd=True) as f:
            name1 = f.readline().rstrip().decode('ascii')
            name2 = f.readline().rstrip().decode('ascii')
        _multiprocessing.sem_unlink(name1)
        p.terminate()
        p.wait()
        time.sleep(2.0)
        with self.assertRaises(OSError) as ctx:
            _multiprocessing.sem_unlink(name2)
        # docs say it should be ENOENT, but OSX seems to give EINVAL
        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
        err = p.stderr.read().decode('utf-8')
        p.stderr.close()
        expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
        self.assertRegex(err, expected)
        self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)

#
# Mixins
#
项目:Discord-ASM-Bot    作者:Emzi0767    | 项目源码 | 文件源码
def main():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    asmbot.log("ASM version {} booting".format(asmbot.__version__), tag="ASM LDR")

    asmbot.log("Loading config", tag="PAM LDR")
    with open("config.json", "r") as f:
        cts = f.read()
        tkd = json.loads(cts)

    asmbot.log("Launching ASM", tag="ASM LDR")

    mp.set_start_method("spawn")
    executor = ThreadPoolExecutor(max_workers=int(tkd["shard_count"]))
    processes = []

    for i in range(0, int(tkd["shard_count"])):
        args = {
            "token": tkd["token"],
            "shard_id": i,
            "shard_count": int(tkd["shard_count"]),
            "script": tkd["script"],
            "guild_blacklist": tkd["guild_blacklist"],
            "guild_exempt_list": tkd["guild_exempt_list"]
        }

        loop.create_task(launch_process(executor, asmbotlauncher.initialize_asmbot, **args))

        if i != int(tkd["shard_count"]) - 1:
            loop.run_until_complete(asyncio.sleep(10))

    asmbot.log("Running", tag="ASM LDR")

    try:
        loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks(loop)))

    except KeyboardInterrupt:
        pass

    finally:
        asmbot.log("Shutting down", tag="ASM LDR")
        for process in processes:
            process.join()

    asmbot.log("Shutdown finalized", tag="ASM LDR")