我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.disk_partitions()。
def test_disk_partitions_and_usage(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): out = sh('df -P -B 1 "%s"' % path).strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) dev, total, used, free = line.split()[:4] if dev == 'none': dev = '' total, used, free = int(total), int(used), int(free) return dev, total, used, free for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) dev, total, used, free = df(part.mountpoint) self.assertEqual(usage.total, total) # 10 MB tollerance if abs(usage.free - free) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % (usage.free, free)) if abs(usage.used - used) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_procfs_path(self): tdir = tempfile.mkdtemp() try: psutil.PROCFS_PATH = tdir self.assertRaises(IOError, psutil.virtual_memory) self.assertRaises(IOError, psutil.cpu_times) self.assertRaises(IOError, psutil.cpu_times, percpu=True) self.assertRaises(IOError, psutil.boot_time) # self.assertRaises(IOError, psutil.pids) self.assertRaises(IOError, psutil.net_connections) self.assertRaises(IOError, psutil.net_io_counters) self.assertRaises(IOError, psutil.net_if_stats) self.assertRaises(IOError, psutil.disk_io_counters) self.assertRaises(IOError, psutil.disk_partitions) self.assertRaises(psutil.NoSuchProcess, psutil.Process) finally: psutil.PROCFS_PATH = "/proc" os.rmdir(tdir)
def btrfs_get_mountpoints_of_subvol(subvol='', **kwargs): """ Determine the list of mountpoints for a given subvol (of the form @/foo/bar). Returns a list of mountpoint(s), or an empty list. """ mountpoints = [] if not subvol: return [] # Seems the easiest way to do this is to walk the disk partitions, extract the opts # string and see if subvol is present. Remember the leading '/'. for part in psutil.disk_partitions(): if "subvol=/{}".format(subvol) in part.opts: mountpoints.append(part.mountpoint) return mountpoints # pylint: disable=unused-argument
def get_mountpoint_opts(mountpoint='', **kwargs): """ Determine the mount options set for a given mountpoint. Returns a list of mount opts or None on error. For opts in the form 'key=val', convert the opt into dictionary. Thus, our return structure may look something like: [ 'rw', 'relatime', ..., { 'subvolid': '259' }, ... ]' """ opts = None for part in psutil.disk_partitions(): if part.mountpoint == mountpoint: opts = part.opts.split(',') # Convert foo=bar to dictionary entries if opts is not None or not an empty list. opts = [o if '=' not in o else {k: v for (k, v) in [tuple(o.split('='))]} for o in opts] if opts else None if not opts: log.error("Failed to determine mount opts for '{}'.".format(mountpoint)) return opts
def get_file_systems(self): systems = psutil.disk_partitions() for i in range(0, len(systems)): system = systems[i] system_options = {} for option in system.opts.split(','): option_local = prism.helpers.locale_('system', 'mount.options.' + option) if option != option_local: system_options[option] = option_local else: system_options[option] = prism.helpers.locale_('system', 'mount.options.unknown') systems[i] = {'device': system.device, 'mount_point': system.mountpoint, 'fs_type': system.fstype, 'options': system_options, 'usage': psutil.disk_usage(system.mountpoint)} return systems
def disksinfo(self): values = [] disk_partitions = psutil.disk_partitions(all=False) for partition in disk_partitions: usage = psutil.disk_usage(partition.mountpoint) device = {'device': partition.device, 'mountpoint': partition.mountpoint, 'fstype': partition.fstype, 'opts': partition.opts, 'total': usage.total, 'used': usage.used, 'free': usage.free, 'percent': usage.percent } values.append(device) values = sorted(values, key=lambda device: device['device']) return values
def add_device(request, device_path, bucket=None): ceph.utils.osdize(dev, hookenv.config('osd-format'), ceph_hooks.get_journal_devices(), hookenv.config('osd-reformat'), hookenv.config('ignore-device-errors'), hookenv.config('osd-encrypt'), hookenv.config('bluestore')) # Make it fast! if hookenv.config('autotune'): ceph.utils.tune_dev(dev) mounts = filter(lambda disk: device_path in disk.device, psutil.disk_partitions()) if mounts: osd = mounts[0] osd_id = osd.mountpoint.split('/')[-1].split('-')[-1] request.ops.append({ 'op': 'move-osd-to-bucket', 'osd': "osd.{}".format(osd_id), 'bucket': bucket}) return request
def disk(self): """ Per system partition disk usage on system Converting byte to human readable format Return dict """ disk_info = {} parts = self.disk_partitions for i in parts: part = i.mountpoint part_usage = self.partition(part) total = self.hr(part_usage.total) used = self.hr(part_usage.used) free = self.hr(part_usage.free) percent = part_usage.percent disk_info[part] = { "total": total, "used": used, "free": free, "percent": percent } return disk_info
def test_disk_partitions_mocked(self): # Test that ZFS partitions are returned. with open("/proc/filesystems", "r") as f: data = f.read() if 'zfs' in data: for part in psutil.disk_partitions(): if part.fstype == 'zfs': break else: self.fail("couldn't find any ZFS partition") else: # No ZFS partitions on this system. Let's fake one. fake_file = io.StringIO(u("nodev\tzfs\n")) with mock.patch('psutil._pslinux.open', return_value=fake_file, create=True) as m1: with mock.patch( 'psutil._pslinux.cext.disk_partitions', return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: ret = psutil.disk_partitions() assert m1.called assert m2.called assert ret self.assertEqual(ret[0].fstype, 'zfs')
def add_device(request, device_path, bucket=None): ceph.osdize(dev, config('osd-format'), get_journal_devices(), config('osd-reformat'), config('ignore-device-errors'), config('osd-encrypt'), config('bluestore')) # Make it fast! if config('autotune'): ceph.tune_dev(dev) mounts = filter(lambda disk: device_path in disk.device, psutil.disk_partitions()) if mounts: osd = mounts[0] osd_id = osd.mountpoint.split('/')[-1].split('-')[-1] request.ops.append({ 'op': 'move-osd-to-bucket', 'osd': "osd.{}".format(osd_id), 'bucket': bucket}) return request
def handle_drive_intent(self, message): partitions = psutil.disk_partitions() for partition in partitions: print("partition.mountpoint: %s" % partition.mountpoint) if partition.mountpoint.startswith("/snap/"): continue partition_data = psutil.disk_usage(partition.mountpoint) # total=21378641920, used=4809781248, free=15482871808, # percent=22.5 data = { "mountpoint": partition.mountpoint, "total": sizeof_fmt(partition_data.total), "used": sizeof_fmt(partition_data.used), "free": sizeof_fmt(partition_data.free), "percent": partition_data.percent } if partition_data.percent >= 90: self.speak_dialog("drive.low", data) else: self.speak_dialog("drive", data)
def get_disk_info(self): data = [] try: disks = psutil.disk_partitions(all=True) for disk in disks: if not disk.device: continue if disk.opts.upper() in ('CDROM', 'REMOVABLE'): continue item = {} item['name'] = disk.device item['device'] = disk.device item['mountpoint'] = disk.mountpoint item['fstype'] = disk.fstype item['size'] = psutil.disk_usage(disk.mountpoint).total >> 10 data.append(item) data.sort(key=lambda x: x['device']) except: data = [] self.logger.error(traceback.format_exc()) return data
def get_disk_rate_info(self): returnData = {} returnData['disk_total'] = {} returnData['disk_used'] = {} returnData['disk_percent'] = {} try: disk = psutil.disk_partitions() for val in disk: if val.fstype != "": mountpoint = val.mountpoint one = psutil.disk_usage(mountpoint) tmp = one.total/1024/1024/1024.0 returnData['disk_total'][mountpoint] = "%.2f" % tmp tmp = one.used/1024/1024/1024.0 returnData['disk_used'][mountpoint] = "%.2f" % tmp returnData['disk_percent'][mountpoint] = one.percent except Exception: pybixlib.error(self.logHead + traceback.format_exc()) self.errorInfoDone(traceback.format_exc()) return returnData
def _get_metrics(self): inode_files = 0 inode_free = 0 self.metrics['plugin_version'] = PLUGIN_VERSION self.metrics['heartbeat_required'] = HEARTBEAT for part in psutil.disk_partitions(all=False): inode_stats = self.__get_inode_stats(part.mountpoint) inode_files += inode_stats.f_files inode_free += inode_stats.f_ffree inode_use_pct = 0 inode_used = inode_files - inode_free self.metrics['inode_total'] = inode_files self.metrics['inode_used'] = inode_used self.metrics['inode_free'] = inode_free if inode_files > 0: inode_use_pct = "{:.2f}".format((inode_used * 100.0) / inode_files ) self.metrics['inode_use_percent'] = inode_use_pct self.units['inode_use_percent'] = "%" self.metrics["units"] = self.units return self.metrics
def main(): table = prettytable.PrettyTable(border=False, header=True, left_padding_width=2, padding_width=1) table.field_names = ["Device", "Total", "Used", "Free", "Use%", "Type", "Mount"] for part in psutil.disk_partitions(all=False): if os.name == 'nt': if 'cdrom' in part.opts or part.fstype == '': # skip cd-rom drives with no disk in it; they may raise # ENOENT, pop-up a Windows GUI error for a non-ready # partition or just hang. continue if 'docker' in part.mountpoint and 'aufs' in part.mountpoint: continue usage = psutil.disk_usage(part.mountpoint) table.add_row([part.device, bytes2human(usage.total), bytes2human(usage.used), bytes2human(usage.free), str(int(usage.percent)) + '%', part.fstype, part.mountpoint]) for field in table.field_names: table.align[field] = "l" print table
def collect_diskinfo(self): global workercinfo parts = psutil.disk_partitions() setval = [] devices = {} for part in parts: # deal with each partition if not part.device in devices: devices[part.device] = 1 diskval = {} diskval['device'] = part.device diskval['mountpoint'] = part.mountpoint try: usage = psutil.disk_usage(part.mountpoint) diskval['total'] = usage.total diskval['used'] = usage.used diskval['free'] = usage.free diskval['percent'] = usage.percent if(part.mountpoint.startswith('/opt/docklet/local/volume')): # the mountpoint indicate that the data is the disk used information of a container names = re.split('/',part.mountpoint) container = names[len(names)-1] if not container in workercinfo.keys(): workercinfo[container] = {} workercinfo[container]['disk_use'] = diskval setval.append(diskval) # make a list except Exception as err: logger.warning(traceback.format_exc()) logger.warning(err) #print(output) #print(diskparts) return setval # collect operating system information
def get_all_mountpoint(all=False): partitions = psutil.disk_partitions(all=all) prt_by_mp = [x.mountpoint for x in partitions] return prt_by_mp
def get_disk_partitions(): partitions = psutil.disk_partitions(all=True) by_mount_point = {} for pt in partitions: # OrderedDict([ # ('device', '/dev/disk1'), # ('mountpoint', '/'), # ('fstype', 'hfs'), # ('opts', 'rw,local,rootfs,dovolfs,journaled,multilabel')]) by_mount_point[pt.mountpoint] = _to_dict(pt) return by_mount_point
def collect_system_information(self): values = {} mem = psutil.virtual_memory() values['memory_total'] = mem.total import cpuinfo cpu = cpuinfo.get_cpu_info() values['resources_limit'] = self.resources_limit values['cpu_name'] = cpu['brand'] values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']] values['nets'] = {} values['disks'] = {} values['gpus'] = {} values['boot_time'] = psutil.boot_time() try: for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()): gpu['available'] = gpu_id in self.enabled_gpus values['gpus'][gpu_id] = gpu except Exception: pass for disk in psutil.disk_partitions(): try: name = self.get_disk_name(disk[1]) values['disks'][name] = psutil.disk_usage(disk[1]).total except Exception: # suppress Operation not permitted pass try: for id, net in psutil.net_if_stats().items(): if 0 != id.find('lo') and net.isup: self.nets.append(id) values['nets'][id] = net.speed or 1000 except Exception: # suppress Operation not permitted pass return values
def disk_partitions(self): """ System disk partitions Return list """ partitions = psutil.disk_partitions() return partitions
def test_disk_partitions(self): self.execute(psutil.disk_partitions)
def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): out = sh('df -k "%s"' % path).strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) dev, total, used, free = line.split()[:4] if dev == 'none': dev = '' total = int(total) * 1024 used = int(used) * 1024 free = int(free) * 1024 return dev, total, used, free for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) dev, total, used, free = df(part.mountpoint) self.assertEqual(part.device, dev) self.assertEqual(usage.total, total) # 10 MB tollerance if abs(usage.free - free) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % usage.free, free) if abs(usage.used - used) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % usage.used, used)
def test_disks(self): ps_parts = psutil.disk_partitions(all=True) wmi_parts = wmi.WMI().Win32_LogicalDisk() for ps_part in ps_parts: for wmi_part in wmi_parts: if ps_part.device.replace('\\', '') == wmi_part.DeviceID: if not ps_part.mountpoint: # this is usually a CD-ROM with no disk inserted break try: usage = psutil.disk_usage(ps_part.mountpoint) except OSError as err: if err.errno == errno.ENOENT: # usually this is the floppy break else: raise self.assertEqual(usage.total, int(wmi_part.Size)) wmi_free = int(wmi_part.FreeSpace) self.assertEqual(usage.free, wmi_free) # 10 MB tollerance if abs(usage.free - wmi_free) > 10 * 1024 * 1024: self.fail("psutil=%s, wmi=%s" % ( usage.free, wmi_free)) break else: self.fail("can't find partition %s" % repr(ps_part))
def test_disk_usage(self): def df(device): out = sh("df -k %s" % device).strip() line = out.split('\n')[1] fields = line.split() total = int(fields[1]) * 1024 used = int(fields[2]) * 1024 free = int(fields[3]) * 1024 percent = float(fields[4].replace('%', '')) return (total, used, free, percent) tolerance = 4 * 1024 * 1024 # 4MB for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) try: total, used, free, percent = df(part.device) except RuntimeError as err: # see: # https://travis-ci.org/giampaolo/psutil/jobs/138338464 # https://travis-ci.org/giampaolo/psutil/jobs/138343361 if "no such file or directory" in str(err).lower() or \ "raw devices not supported" in str(err).lower(): continue else: raise else: self.assertAlmostEqual(usage.total, total, delta=tolerance) self.assertAlmostEqual(usage.used, used, delta=tolerance) self.assertAlmostEqual(usage.free, free, delta=tolerance) self.assertAlmostEqual(usage.percent, percent, delta=1)
def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): out = sh('df -k "%s"' % path).strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) dev, total, used, free = line.split()[:4] if dev == 'none': dev = '' total = int(total) * 1024 used = int(used) * 1024 free = int(free) * 1024 return dev, total, used, free for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) dev, total, used, free = df(part.mountpoint) self.assertEqual(part.device, dev) self.assertEqual(usage.total, total) # 10 MB tollerance if abs(usage.free - free) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % (usage.free, free)) if abs(usage.used - used) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % (usage.used, used))
def update_disks(self): self.disks.clear() for disk in psutil.disk_partitions(all=False): disk_usage = psutil.disk_usage(disk.mountpoint) self.disks[disk.mountpoint] = {'device': disk.device, 'real_device': disk.device, 'fstype': disk.fstype, 'opts': disk.opts, 'total': disk_usage.total, 'used': disk_usage.used, 'free': disk_usage.free, 'percent': disk_usage.percent} if os.path.islink(disk.device): self.disks[disk.mountpoint]['real_device'] = os.path.realpath(disk.device) # ?????????????? ??? ???
def partitions(): return psutil.disk_partitions()
def getLocation(): context = pyudev.Context() removable = [device for device in context.list_devices(subsystem='block', DEVTYPE='disk') if device.attributes.asstring('removable') == "1"] for device in removable: partitions = [device.device_node for device in context.list_devices(subsystem='block', DEVTYPE='partition', parent=device)] for p in psutil.disk_partitions(): if p.device in partitions: mntpoint = p.mountpoint print(p.device + ' => ' + mntpoint) return mntpoint
def run(self, config): disk = {} disk['df-psutil'] = [] for part in psutil.disk_partitions(False): if os.name == 'nt': if 'cdrom' in part.opts or part.fstype == '': # skip cd-rom drives with no disk in it; they may raise # ENOENT, pop-up a Windows GUI error for a non-ready # partition or just hang. continue try: usage = psutil.disk_usage(part.mountpoint) diskdata = {} diskdata['info'] = part for key in usage._fields: diskdata[key] = getattr(usage, key) disk['df-psutil'].append(diskdata) except: pass try: force_df = config.get('diskusage', 'force_df') except: force_df = 'no' if len(disk['df-psutil']) == 0 or force_df == 'yes': try: disk['df-psutil'] = [] df_output_lines = [s.split() for s in os.popen("df -Pl").read().splitlines()] del df_output_lines[0] for row in df_output_lines: if row[0] == 'tmpfs': continue disk['df-psutil'].append({'info': [row[0], row[5],'',''], 'total': int(row[1])*1024, 'used': int(row[2])*1024, 'free': int(row[3])*1024, 'percent': row[4][:-1]}) except: pass return disk
def getAttrRO(self): attrs = [ 'disk_partitions' ] return ServiceBase.getAttrRO() + attrs
def get_disk_partitions(self): return psutil.disk_partitions()
def monitor(frist_invoke=1): value_dic = { 'disk': {} } for part in psutil.disk_partitions(all=False): if os.name == 'nt': if 'cdrom' in part.opts or part.fstype == '': # skip cd-rom drives with no disk in it; they may raise # ENOENT, pop-up a Windows GUI error for a non-ready # partition or just hang. continue mount_point = part.mountpoint device = part.device usage = psutil.disk_usage(part.mountpoint) value_dic['disk'][mount_point] = { 'disk.device':device, 'disk.mount_point':mount_point, 'disk.total':usage.total/(1024*1024), 'disk.used':usage.used/(1024*1024), 'disk.free':usage.free/(1024*1024), 'disk.percent': usage.percent, 'disk.type':part.fstype, } return value_dic
def check(self): data = { 'disk.max_used_percent': 0, 'disk.max_used_percent_mount': '', 'disk.max_used': 0, 'disk.max_used_mount': '', 'disk.min_free': 0, 'disk.min_free_mount': '', 'disk.used_percent': 0, 'disk.used': 0, 'disk.free': 0, 'disk.total': 0, } partitions = psutil.disk_partitions(all=True) for partition in partitions: if partition.opts.upper() in ('CDROM', 'REMOVABLE'): continue mountpoint = partition.mountpoint usage = psutil.disk_usage(mountpoint) if usage.percent > data['disk.max_used_percent']: data['disk.max_used_percent'] = usage.percent data['disk.max_used_percent_mount'] = mountpoint if usage.used > data['disk.max_used']: data['disk.max_used'] = usage.used data['disk.max_used_mount'] = mountpoint if usage.free < data['disk.min_free'] or data['disk.min_free'] == 0: data['disk.min_free'] = usage.free data['disk.min_free_mount'] = mountpoint data['disk.used'] += usage.used data['disk.free'] += usage.free data['disk.total'] += usage.total data['disk.used_percent'] = data['disk.used']*100 / data['disk.total'] if data['disk.total'] else 0 data['disk.max_used'] = data['disk.max_used'] / 1024 data['disk.min_free'] = data['disk.min_free'] / 1024 data['disk.used'] = data['disk.used'] / 1024 data['disk.free'] = data['disk.free'] / 1024 data['disk.total'] = data['disk.total'] / 1024 return data
def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): out = sh('df -k "%s"' % path).strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) dev, total, used, free = line.split()[:4] if dev == 'none': dev = '' total = int(total) * 1024 used = int(used) * 1024 free = int(free) * 1024 return dev, total, used, free for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) dev, total, used, free = df(part.mountpoint) self.assertEqual(part.device, dev) self.assertEqual(usage.total, total) # 10 MB tollerance if abs(usage.free - free) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % usage.free, free) if abs(usage.used - used) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % usage.used, used) # --- cpu
def test_disk_usage(self): for disk in psutil.disk_partitions(): sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint) psutil_value = psutil.disk_usage(disk.mountpoint) self.assertAlmostEqual(sys_value[0], psutil_value.free, delta=1024 * 1024) self.assertAlmostEqual(sys_value[1], psutil_value.total, delta=1024 * 1024) self.assertEqual(psutil_value.used, psutil_value.total - psutil_value.free)
def test_disk_partitions(self): sys_value = [ x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00") if x and not x.startswith('A:')] psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)] self.assertEqual(sys_value, psutil_value)