Python psutil 模块,ZombieProcess() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用psutil.ZombieProcess()

项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def pids_active(pids_computer):
    """
    This function find pids of computer and return the valid.
    """
    pid_valid = {}
    for pid in pids_computer:
        data = None
        try:
            process = psutil.Process(pid)
            data = {"pid": process.pid,
                    "status": process.status(),
                    "percent_cpu_used": process.cpu_percent(interval=0.0),
                    "percent_memory_used": process.memory_percent()}

        except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
            data = None

        if data is not None:
            pid_valid[process.name()] = data
    return pid_valid
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def pids_active(pids_computer):
    """
    This function find pids of computer and return the valid.
    """
    pid_valid = {}
    for pid in pids_computer:
        data = None
        try:
            process = psutil.Process(pid)
            data = {"pid": process.pid,
                    "status": process.status(),
                    "percent_cpu_used": process.cpu_percent(interval=0.0),
                    "percent_memory_used": process.memory_percent()}

        except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
            data = None

        if data is not None:
            pid_valid[process.name()] = data
    return pid_valid
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_exe_mocked(self):
        with mock.patch('psutil._pslinux.os.readlink',
                        side_effect=OSError(errno.ENOENT, "")) as m:
            # No such file error; might be raised also if /proc/pid/exe
            # path actually exists for system processes with low pids
            # (about 0-20). In this case psutil is supposed to return
            # an empty string.
            ret = psutil.Process().exe()
            assert m.called
            self.assertEqual(ret, "")

            # ...but if /proc/pid no longer exist we're supposed to treat
            # it as an alias for zombie process
            with mock.patch('psutil._pslinux.os.path.lexists',
                            return_value=False):
                self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_zombie_process__repr__(self, func=repr):
        self.assertEqual(
            repr(psutil.ZombieProcess(321)),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321)")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, name='foo')),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321, name='foo')")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321, name='foo', ppid=1)")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, msg='foo')),
            "psutil.ZombieProcess foo")
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_ad_on_process_creation(self):
        # We are supposed to be able to instantiate Process also in case
        # of zombie processes or access denied.
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.AccessDenied) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.ZombieProcess(1)) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=ValueError) as meth:
            with self.assertRaises(ValueError):
                psutil.Process()
            assert meth.called
项目:fermentrack    作者:thorrak    | 项目源码 | 文件源码
def update(self):
        """
        Update the list of BrewPi processes by receiving them from the system with psutil.
        Returns: list of BrewPiProcess objects
        """
        bpList = []
        matching = []

        # some OS's (OS X) do not allow processes to read info from other processes. 
        try:
            matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'brewpi.py'in s for s in p.cmdline())]
        except psutil.AccessDenied:
            pass
        except psutil.ZombieProcess:
            pass

        for p in matching:
            bp = self.parseProcess(p)
            if bp:
                bpList.append(bp)
        self.list = bpList
        return self.list
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_exe_mocked(self):
        with mock.patch('psutil._pslinux.os.readlink',
                        side_effect=OSError(errno.ENOENT, "")) as m:
            # No such file error; might be raised also if /proc/pid/exe
            # path actually exists for system processes with low pids
            # (about 0-20). In this case psutil is supposed to return
            # an empty string.
            ret = psutil.Process().exe()
            assert m.called
            self.assertEqual(ret, "")

            # ...but if /proc/pid no longer exist we're supposed to treat
            # it as an alias for zombie process
            with mock.patch('psutil._pslinux.os.path.lexists',
                            return_value=False):
                self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_zombie_process__repr__(self, func=repr):
        self.assertEqual(
            repr(psutil.ZombieProcess(321)),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321)")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, name='foo')),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321, name='foo')")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321, name='foo', ppid=1)")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, msg='foo')),
            "psutil.ZombieProcess foo")
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_ad_on_process_creation(self):
        # We are supposed to be able to instantiate Process also in case
        # of zombie processes or access denied.
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.AccessDenied) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.ZombieProcess(1)) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=ValueError) as meth:
            with self.assertRaises(ValueError):
                psutil.Process()
            assert meth.called
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_exe_mocked(self):
        with mock.patch('psutil._pslinux.os.readlink',
                        side_effect=OSError(errno.ENOENT, "")) as m:
            # No such file error; might be raised also if /proc/pid/exe
            # path actually exists for system processes with low pids
            # (about 0-20). In this case psutil is supposed to return
            # an empty string.
            ret = psutil.Process().exe()
            assert m.called
            self.assertEqual(ret, "")

            # ...but if /proc/pid no longer exist we're supposed to treat
            # it as an alias for zombie process
            with mock.patch('psutil._pslinux.os.path.lexists',
                            return_value=False):
                self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_zombie_process__repr__(self, func=repr):
        self.assertEqual(
            repr(psutil.ZombieProcess(321)),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321)")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, name='foo')),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321, name='foo')")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
            "psutil.ZombieProcess process still exists but it's a zombie "
            "(pid=321, name='foo', ppid=1)")
        self.assertEqual(
            repr(psutil.ZombieProcess(321, msg='foo')),
            "psutil.ZombieProcess foo")
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_ad_on_process_creation(self):
        # We are supposed to be able to instantiate Process also in case
        # of zombie processes or access denied.
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.AccessDenied) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.ZombieProcess(1)) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=ValueError) as meth:
            with self.assertRaises(ValueError):
                psutil.Process()
            assert meth.called
项目:universe    作者:openai    | 项目源码 | 文件源码
def get_chrome_procs(self):
        def is_chrome(proc):
            try:
                return proc.name() == 'chrome'
            except psutil.ZombieProcess:
                return False
        return [p for p in psutil.process_iter() if is_chrome(p)]
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_process__repr__(self, func=repr):
        p = psutil.Process()
        r = func(p)
        self.assertIn("psutil.Process", r)
        self.assertIn("pid=%s" % p.pid, r)
        self.assertIn("name=", r)
        self.assertIn(p.name(), r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.ZombieProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("zombie", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.NoSuchProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("terminated", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.AccessDenied(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertNotIn("name=", r)
项目:fermentrack    作者:thorrak    | 项目源码 | 文件源码
def findConflicts(self, process):
        """
        Finds out if the process given as argument will conflict with other running instances of BrewPi
        Always returns a conflict if a firmware update is running

        Params:
        process: a BrewPiProcess object that will be compared with other running instances

        Returns:
        bool: True means there are conflicts, False means no conflict
        """

                # some OS's (OS X) do not allow processes to read info from other processes.
        matching = []
        try:
            matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'flashDfu.py'in s for s in p.cmdline())]
        except psutil.AccessDenied:
            pass
        except psutil.ZombieProcess:
            pass

        if len(matching) > 0:
            return 1

        try:
            matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'updateFirmware.py'in s for s in p.cmdline())]
        except psutil.AccessDenied:
            pass
        except psutil.ZombieProcess:
            pass


        if len(matching) > 0:
            return 1

        for p in self.list:
            if process.pid == p.pid:  # skip the process itself
                continue
            elif process.conflict(p):
                return 1
        return 0
项目:atsy    作者:EricRahm    | 项目源码 | 文件源码
def print_stats(self, verbose=False):
        """
        Prints out stats for each matched process and a sum of the RSS of the
        parent process and the USS of its children.

        :param verbose: Set true to see the full command-line. This is useful
         when deciding on a parent filter.
        """
        def wrapped_path_filter(x):
            try:
                return self.path_filter(x.exe())
            except (psutil.AccessDenied, psutil.ZombieProcess):
                return False

        parent_rss = 0
        children_uss = 0

        for p in filter(wrapped_path_filter, psutil.process_iter()):
            info = p.memory_full_info()
            rss = info.rss
            uss = info.uss
            cmdline = self.get_cmdline(p)
            exe = cmdline if verbose else p.exe()

            if self.parent_filter(cmdline):
                print "[%d] - %s\n  * RSS - %d\n    USS - %d" % (p.pid, exe, rss, uss)
                parent_rss += rss
            else:
                print "[%d] - %s\n    RSS - %d\n  * USS - %d" % (p.pid, exe, rss, uss)
                children_uss += uss

        if not parent_rss:
            if not children_uss:
                raise ProcessNotFoundException(
                    "No processes matched the path filter")
            else:
                raise ProcessNotFoundException(
                    "No process matched the parent filter")

        print "\nTotal: {:,} bytes\n".format(parent_rss + children_uss)
项目:django-mapproxy    作者:terranodo    | 项目源码 | 文件源码
def get_process_from_pid(pid):
    process = None
    if is_int_str(pid):
        try:
            process = psutil.Process(pid=int(pid))
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
            log.debug('PROCESS HAS BEEN TERMINATED {}'.format(int(pid)))
            pass
    return process
项目:django-mapproxy    作者:terranodo    | 项目源码 | 文件源码
def get_is_process_running(pid):
    process = None
    if is_int_str(pid):
        try:
            process = psutil.Process(pid=int(pid))
            exitCode = process.wait(0)
            log.debug('waited for process.. exitCode: {}'.format(exitCode))
        except (psutil.NoSuchProcess, psutil.ZombieProcess, psutil.TimeoutExpired):
            pass
    return process
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_process__repr__(self, func=repr):
        p = psutil.Process()
        r = func(p)
        self.assertIn("psutil.Process", r)
        self.assertIn("pid=%s" % p.pid, r)
        self.assertIn("name=", r)
        self.assertIn(p.name(), r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.ZombieProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("zombie", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.NoSuchProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("terminated", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.AccessDenied(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertNotIn("name=", r)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_process__repr__(self, func=repr):
        p = psutil.Process()
        r = func(p)
        self.assertIn("psutil.Process", r)
        self.assertIn("pid=%s" % p.pid, r)
        self.assertIn("name=", r)
        self.assertIn(p.name(), r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.ZombieProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("zombie", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.NoSuchProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("terminated", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.AccessDenied(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertNotIn("name=", r)
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def main():
    # construct a dict where 'values' are all the processes
    # having 'key' as their parent
    tree = collections.defaultdict(list)
    for p in psutil.process_iter():
        try:
            tree[p.ppid()].append(p.pid)
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
            pass
    # on systems supporting PID 0, PID 0's parent is usually 0
    if 0 in tree and 0 in tree[0]:
        tree[0].remove(0)
    print_tree(min(tree), tree)
项目:rerobot    作者:voqz    | 项目源码 | 文件源码
def info(ctx, message):
    """
    Displays Rero information

    :param ctx:
    :param message:
    :return:
    """
    # TODO: this command needs to be fixed later
    servs = len(ctx.servers)
    ch = list(ctx.get_all_channels())
    channels = len(ch)
    mem = list(ctx.get_all_members())
    members = len(mem)

    current = datetime.datetime.now()
    diff = current - ctx.startup_timestamp
    days = diff.days
    seconds = diff.seconds
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    # total_c = ctx.usage_track_admin['admin'] + ctx.usage_track['user']
    # total_m = ctx.usage_track_admin['total_msg']

    # r_min = (days * 24 * 60) + (h * 60) + m
    # msg_rate = float(total_c / r_min)
    # total_msg_rate = float(total_m / r_min)
    try:
        proc = psutil.Process(pid=os.getpid())
        rss = float(proc.memory_info().rss) / 1000000
        rss_per = float(proc.memory_percent())
        cpu_per = float(proc.cpu_percent(interval=0.2))
        sys_str = "RAM: {0:.2f} MB | CPU Usage: {1:.2f}% (*calculated for `2 ms` interval*)" \
            .format(rss, rss_per, cpu_per)
    except (psutil.NoSuchProcess, psutil.ZombieProcess, psutil.AccessDenied):
        sys_str = ""

    # ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \
    #           "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \
    #           "\n**{7}** commands were __used__ till now (**~{8:.2f}** per min)" \
    #           "\n**{9}** messages were __sent__ till now (**~{10:.2f}** per min)" \
    #           "\n**{11}** messages were __seen__ till now (**~{12:.2f}** per min)" \
    #           "\n{13}" \
    #     .format(str(servs), str(channels), str(members), str(days), str(h),
    #             str(m), str(s), str(total_c), msg_rate, ctx.rero_sent_message,
    #             float(ctx.rero_sent_message / r_min), str(total_m), total_msg_rate, sys_str)

    ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \
              "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \
              "\n{7}\n**SHARD: {8} / 2**" \
        .format(str(servs), str(channels), str(members), str(days), str(h),
                str(m), str(s), sys_str, ctx.current_shard)

    await ctx.send_message(message.channel, ret_str)
项目:Lapzbot_Beta    作者:lap00zza    | 项目源码 | 文件源码
def info(ctx, message):
    """
    Displays Lapzbot information

    :param ctx:
    :param message:
    :return:
    """
    # TODO: this command needs to be fixed later
    servs = len(ctx.servers)
    ch = list(ctx.get_all_channels())
    channels = len(ch)
    mem = list(ctx.get_all_members())
    members = len(mem)

    current = datetime.datetime.now()
    diff = current - ctx.startup_timestamp
    days = diff.days
    seconds = diff.seconds
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    # total_c = ctx.usage_track_admin['admin'] + ctx.usage_track['user']
    # total_m = ctx.usage_track_admin['total_msg']

    # r_min = (days * 24 * 60) + (h * 60) + m
    # msg_rate = float(total_c / r_min)
    # total_msg_rate = float(total_m / r_min)
    try:
        proc = psutil.Process(pid=os.getpid())
        rss = float(proc.memory_info().rss) / 1000000
        rss_per = float(proc.memory_percent())
        cpu_per = float(proc.cpu_percent(interval=0.2))
        sys_str = "RAM: {0:.2f} MB | CPU Usage: {1:.2f}% (*calculated for `2 ms` interval*)" \
            .format(rss, rss_per, cpu_per)
    except (psutil.NoSuchProcess, psutil.ZombieProcess, psutil.AccessDenied):
        sys_str = ""

    # ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \
    #           "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \
    #           "\n**{7}** commands were __used__ till now (**~{8:.2f}** per min)" \
    #           "\n**{9}** messages were __sent__ till now (**~{10:.2f}** per min)" \
    #           "\n**{11}** messages were __seen__ till now (**~{12:.2f}** per min)" \
    #           "\n{13}" \
    #     .format(str(servs), str(channels), str(members), str(days), str(h),
    #             str(m), str(s), str(total_c), msg_rate, ctx.lapzbot_sent_message,
    #             float(ctx.lapzbot_sent_message / r_min), str(total_m), total_msg_rate, sys_str)

    ret_str = "`Connected to '{0}' Servers with '{1}' Channels and '{2}' Members`" \
              "\nCurrent uptime is {3} Days, {4} Hours, {5} Minutes, {6} Seconds" \
              "\n{7}\n**SHARD: {8} / 2**" \
        .format(str(servs), str(channels), str(members), str(days), str(h),
                str(m), str(s), sys_str, ctx.current_shard)

    await ctx.send_message(message.channel, ret_str)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_as_dict(self):
        p = psutil.Process()
        d = p.as_dict(attrs=['exe', 'name'])
        self.assertEqual(sorted(d.keys()), ['exe', 'name'])

        p = psutil.Process(min(psutil.pids()))
        d = p.as_dict(attrs=['connections'], ad_value='foo')
        if not isinstance(d['connections'], list):
            self.assertEqual(d['connections'], 'foo')

        # Test ad_value is set on AccessDenied.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=psutil.AccessDenied):
            self.assertEqual(
                p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1})

        # Test that NoSuchProcess bubbles up.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=psutil.NoSuchProcess(p.pid, "name")):
            self.assertRaises(
                psutil.NoSuchProcess, p.as_dict, attrs=["nice"])

        # Test that ZombieProcess is swallowed.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=psutil.ZombieProcess(p.pid, "name")):
            self.assertEqual(
                p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"})

        # By default APIs raising NotImplementedError are
        # supposed to be skipped.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=NotImplementedError):
            d = p.as_dict()
            self.assertNotIn('nice', list(d.keys()))
            # ...unless the user explicitly asked for some attr.
            with self.assertRaises(NotImplementedError):
                p.as_dict(attrs=["nice"])

        # errors
        with self.assertRaises(TypeError):
            p.as_dict('name')
        with self.assertRaises(ValueError):
            p.as_dict(['foo'])
        with self.assertRaises(ValueError):
            p.as_dict(['foo', 'bar'])
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_as_dict(self):
        p = psutil.Process()
        d = p.as_dict(attrs=['exe', 'name'])
        self.assertEqual(sorted(d.keys()), ['exe', 'name'])

        p = psutil.Process(min(psutil.pids()))
        d = p.as_dict(attrs=['connections'], ad_value='foo')
        if not isinstance(d['connections'], list):
            self.assertEqual(d['connections'], 'foo')

        # Test ad_value is set on AccessDenied.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=psutil.AccessDenied):
            self.assertEqual(
                p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1})

        # Test that NoSuchProcess bubbles up.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=psutil.NoSuchProcess(p.pid, "name")):
            self.assertRaises(
                psutil.NoSuchProcess, p.as_dict, attrs=["nice"])

        # Test that ZombieProcess is swallowed.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=psutil.ZombieProcess(p.pid, "name")):
            self.assertEqual(
                p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"})

        # By default APIs raising NotImplementedError are
        # supposed to be skipped.
        with mock.patch('psutil.Process.nice', create=True,
                        side_effect=NotImplementedError):
            d = p.as_dict()
            self.assertNotIn('nice', list(d.keys()))
            # ...unless the user explicitly asked for some attr.
            with self.assertRaises(NotImplementedError):
                p.as_dict(attrs=["nice"])

        # errors
        with self.assertRaises(TypeError):
            p.as_dict('name')
        with self.assertRaises(ValueError):
            p.as_dict(['foo'])
        with self.assertRaises(ValueError):
            p.as_dict(['foo', 'bar'])
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def poll(interval):
    """Calculate IO usage by comparing IO statics before and
    after the interval.
    Return a tuple including all currently running processes
    sorted by IO activity and total disks I/O activity.
    """
    # first get a list of all processes and disk io counters
    procs = [p for p in psutil.process_iter()]
    for p in procs[:]:
        try:
            p._before = p.io_counters()
        except psutil.Error:
            procs.remove(p)
            continue
    disks_before = psutil.disk_io_counters()

    # sleep some time
    time.sleep(interval)

    # then retrieve the same info again
    for p in procs[:]:
        try:
            p._after = p.io_counters()
            p._cmdline = ' '.join(p.cmdline())
            if not p._cmdline:
                p._cmdline = p.name()
            p._username = p.username()
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
            procs.remove(p)
    disks_after = psutil.disk_io_counters()

    # finally calculate results by comparing data before and
    # after the interval
    for p in procs:
        p._read_per_sec = p._after.read_bytes - p._before.read_bytes
        p._write_per_sec = p._after.write_bytes - p._before.write_bytes
        p._total = p._read_per_sec + p._write_per_sec

    disks_read_per_sec = disks_after.read_bytes - disks_before.read_bytes
    disks_write_per_sec = disks_after.write_bytes - disks_before.write_bytes

    # sort processes by total disk IO so that the more intensive
    # ones get listed first
    processes = sorted(procs, key=lambda p: p._total, reverse=True)

    return (processes, disks_read_per_sec, disks_write_per_sec)