Python nibabel 模块,load() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nibabel.load()。
def generate_patch_probs(path, patch_locations, patch_size, im_size):
x, y, z = patch_locations
seg = nib.load(glob.glob(os.path.join(path, '*_seg.nii.gz'))[0]).get_data().astype(np.float32)
p = []
for i in range(len(x)):
for j in range(len(y)):
for k in range(len(z)):
patch = seg[int(x[i] - patch_size / 2) : int(x[i] + patch_size / 2),
int(y[j] - patch_size / 2) : int(y[j] + patch_size / 2),
int(z[k] - patch_size / 2) : int(z[k] + patch_size / 2)]
patch = (patch > 0).astype(np.float32)
percent = np.sum(patch) / (patch_size * patch_size * patch_size)
p.append((1 - np.abs(percent - 0.5)) * percent)
p = np.asarray(p, dtype=np.float32)
p[p == 0] = np.amin(p[np.nonzero(p)])
p = p / np.sum(p)
return p
def train_xgboost():
df = pd.read_csv('survival_data.csv', index_col=0, encoding = 'UTF-7')
p = np.array([np.mean(np.load('training/%s_flair.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
q = np.array([np.mean(np.load('training/%s_t1.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
r = np.array([np.mean(np.load('training/%s_t1ce.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
s = np.array([np.mean(np.load('training/%s_t2.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
y=np.array([])
t=0
z=np.array([])
for ind in range(len(folder_names_train)):
try:
temp = df.get_value(str(folder_names_train[ind]),'Survival')
y=np.append(y,temp)
temp = df.get_value(str(folder_names_train[ind]),'Age')
z=np.append(z,np.array([temp]))
except Exception as e:
t+=1
print (t,str(e),"Label Not found, deleting entry")
y=np.append(y,0)
z=np.array([[v] for v in z])
t=np.concatenate((p,q),axis=1)
u=np.concatenate((r,s),axis=1)
x=np.concatenate((t,u),axis=1)
#print(x.shape)
#print (x)
#print (x.shape,z.shape)
x=np.concatenate((x,z),axis=1)
#print (x)
#clf=linear_model.LogisticRegression(C=1e5)
#clf = RandomForestRegressor()
clf = xgb.XGBRegressor()
clf.fit(x,y)
return clf
def load_nifti(filename, with_affine=False):
"""
load image from NIFTI file
Parameters
----------
filename : str
filename of NIFTI file
with_affine : bool
if True, returns affine parameters
Returns
-------
data : np.ndarray
image data
"""
img = nib.load(filename)
data = img.get_data()
data = np.copy(data, order="C")
if with_affine:
return data, img.affine
return data
def load_channels(self,normalize = False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
channels = np.zeros( modalities[0].shape + (2,), dtype=np.float32)
for index_mod, mod in enumerate(modalities):
if self.data_augmentation:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj))
else:
channels[:,:,:,index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() )
return channels
def load_channels(self,normalize = False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
channels = np.zeros( modalities[0].shape + (2,), dtype=np.float32)
for index_mod, mod in enumerate(modalities):
if self.data_augmentation:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj))
else:
channels[:,:,:,index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() )
return channels
def load_ROI_mask(self):
proxy = nib.load(self.FLAIR_FILE)
image_array = np.asarray(proxy.dataobj)
mask = np.ones_like(image_array)
mask[np.where(image_array < 90)] = 0
# img = nib.Nifti1Image(mask, proxy.affine)
# nib.save(img, join(modalities_path,'mask.nii.gz'))
struct_element_size = (20, 20, 20)
mask_augmented = np.pad(mask, [(21, 21), (21, 21), (21, 21)], 'constant', constant_values=(0, 0))
mask_augmented = binary_closing(mask_augmented, structure=np.ones(struct_element_size, dtype=bool)).astype(
np.int)
return mask_augmented[21:-21, 21:-21, 21:-21].astype('bool')
def get_subject_shape(self):
if self.booleanLabels:
proxy = nib.load(self.GT_FILE)
elif self.booleanT1:
proxy = nib.load(self.T1_FILE)
elif self.booleanT1c:
proxy = nib.load(self.T1c_FILE)
elif self.booleanT2:
proxy = nib.load(self.T2_FILE)
elif self.booleanFLAIR:
proxy = nib.load(self.FLAIR_FILE)
else:
raise ValueError("[src.helpers.Subject] No modalities are given for subject: " + self.id)
return proxy.shape
def load_channels(self):
flair = nib.load(self.FLAIR_FILE)
t1 = nib.load(self.T1_FILE)
t1c = nib.load(self.T1c_FILE)
t2 = nib.load(self.T2_FILE)
to_int = lambda b: 1 if b else 0
num_input_modalities = to_int(self.booleanFLAIR) + to_int(self.booleanT1) + to_int(self.booleanT1c) + to_int(
self.booleanT2)
channels = np.zeros((num_input_modalities,) + flair.shape, dtype=np.float32)
channels[0] = np.asarray(flair.dataobj) if self.booleanFLAIR is True else None
channels[to_int(self.booleanFLAIR)] = np.asarray(t1.dataobj) if self.booleanT1 is True else None
channels[to_int(self.booleanFLAIR) + to_int(self.booleanT1)] = np.asarray(
t1c.dataobj) if self.booleanT1c is True else None
channels[to_int(self.booleanFLAIR) + to_int(self.booleanT1) + to_int(self.booleanT1c)] = np.asarray(
t2.dataobj) if self.booleanT2 is True else None
return channels
def load_channels(self, normalize=False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
modalities.append(nib.load(self.T1c_FILE))
modalities.append(nib.load(self.T2_FILE))
channels = np.zeros(modalities[0].shape + (4,), dtype=np.float32)
if normalize:
mask = self.load_ROI_mask()
for index_mod, mod in enumerate(modalities):
if self.data_augmentation_flag:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation)
else:
channels[:, :, :, index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:, :, :, index_mod], mask=mask)
return channels
def load_channels(self, normalize=False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
modalities.append(nib.load(self.T1c_FILE))
modalities.append(nib.load(self.T2_FILE))
channels = np.zeros(modalities[0].shape + (4,), dtype=np.float32)
if normalize:
mask = self.load_ROI_mask()
for index_mod, mod in enumerate(modalities):
if self.data_augmentation_flag:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation)
else:
channels[:, :, :, index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:, :, :, index_mod], mask=mask)
return channels
def load_channels(self, normalize=False):
modalities = []
modalities.append(nib.load(self.T2_FILE))
modalities.append(nib.load(self.T1_FILE))
channels = np.zeros(modalities[0].shape + (2,), dtype=np.float32)
for index_mod, mod in enumerate(modalities):
if self.data_augmentation:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation)
else:
channels[:,:,:,index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() )
return channels
def test_individual_stability_matrix():
"""
Tests individual_stability_matrix method on three gaussian blobs.
"""
import utils
import numpy as np
import scipy as sp
desired = np.load(home + '/git_repo/PyBASC/tests/ism_test.npy')
blobs = generate_blobs()
ism = utils.individual_stability_matrix(blobs, 20, 3)
#how to use test here?
# np.corrcoef(ism.flatten(),desired.flatten())
# np.testing.assert_equal(ism,desired)
#
# corr=np.array(sp.spatial.distance.cdist(ism, desired, metric = 'correlation'))
#
assert False
def test_ndarray_to_vol():
import basc
import nibabel as nb
subject_file = home + '/git_repo/PyBASC/sample_data/sub1/Func_Quarter_Res.nii.gz'
subject_file = home + '/git_repo/PyBASC/sample_data/test.nii.gz'
data = nb.load(subject_file).get_data().astype('float32')
roi_mask_file= home + '/git_repo/PyBASC/masks/LC_Quarter_Res.nii.gz'
print( 'Data Loaded')
roi_mask_file_nb = nb.load(roi_mask_file)
roi_mask_nparray = nb.load(roi_mask_file).get_data().astype('float32').astype('bool')
roi1data = data[roi_mask_nparray]
data_array=roi1data
sample_file=subject_file
filename=home + '/git_repo/PyBASC/sample_data/ndarray_to_vol_test.nii.gz'
basc.ndarray_to_vol(data_array, roi_mask_file, roi_mask_file, filename)
def analyse_data(input_dir):
shapes = []
relative_volumes = []
for folder in get_sub_folders(input_dir):
print(folder)
for sub_folder in get_sub_folders(os.path.join(input_dir, folder)):
image_type = get_image_type_from_folder_name(sub_folder)
# do not save the raw data (too heavy)
if image_type != '.OT':
continue
path = os.path.join(input_dir, folder, sub_folder)
filename = next(filename for filename in os.listdir(path) if get_extension(filename) == '.nii')
path = os.path.join(path, filename)
im = nib.load(path)
image = im.get_data()
shape = image.shape
shapes.append(shape)
relative_volumes.append(100 * np.sum(image) / np.cumprod(shape)[-1])
return shapes, relative_volumes
# train
def export_data(prediction_dir, nii_image_dir, tfrecords_dir, export_dir, transformation=None):
for file_path in os.listdir(prediction_dir):
name, prediction, probability = read_prediction_file(os.path.join(prediction_dir, file_path))
if transformation:
image, ground_truth = get_original_image(os.path.join(tfrecords_dir, name + '.tfrecord'), False)
prediction = transformation.transform_image(prediction, probability, image)
# build cv_predictions .nii image
img = nib.Nifti1Image(prediction, np.eye(4))
img.set_data_dtype(dtype=np.uint8)
path = os.path.join(nii_image_dir, name)
adc_name = next(l for l in os.listdir(path) if 'MR_ADC' in l)
export_image = nib.load(os.path.join(nii_image_dir, name, adc_name, adc_name + '.nii'))
i = export_image.get_data()
i[:] = img.get_data()
# set name to specification and export
_id = next(l for l in os.listdir(path) if 'MR_MTT' in l).split('.')[-1]
export_path = os.path.join(export_dir, 'SMIR.' + name + '.' + _id + '.nii')
nib.save(export_image, os.path.join(export_path))
def load(self, path):
""" A method that load the image data and associated metadata.
Parameters
----------
path: str
the path to the image to be loaded.
Return
------
image: Image
the loaded image.
"""
_image = nibabel.load(path)
return Image(spacing=_image.header.get_zooms(),
data_type="scalar",
metadata={"path": path},
data=_image.get_data())
def _get_investigation_template(bids_directory, mri_par_names):
this_path = os.path.realpath(
__file__[:-1] if __file__.endswith('.pyc') else __file__)
template_path = opj(
*(psplit(this_path)[:-1] + ("i_investigation_template.txt", )))
investigation_template = open(template_path).read()
title = psplit(bids_directory)[-1]
if exists(opj(bids_directory, "dataset_description.json")):
with open(opj(bids_directory, "dataset_description.json"), "r") \
as description_dict_fp:
description_dict = json.load(description_dict_fp)
if "Name" in description_dict:
title = description_dict["Name"]
investigation_template = investigation_template.replace(
"[TODO: TITLE]", title)
investigation_template = investigation_template.replace(
"[TODO: MRI_PAR_NAMES]", ";".join(mri_par_names))
return investigation_template
def get_all_nifti_data(directory, map_names=None, deferred=True):
"""Get the data of all the nifti volumes in the given directory.
If map_names is given we will only load the given map names. Else, we load all .nii and .nii.gz files in the
given directory.
Args:
directory (str): the directory from which we want to read a number of maps
map_names (list of str): the names of the maps we want to use. If given, we only use and return these maps.
deferred (boolean): if True we return an deferred loading dictionary instead of a dictionary with the values
loaded as arrays.
Returns:
dict: A dictionary with the volumes. The keys of the dictionary are the filenames
without the extension of the .nii(.gz) files in the given directory.
"""
proxies = load_all_niftis(directory, map_names=map_names)
if deferred:
return DeferredActionDict(lambda _, item: item.get_data(), proxies)
else:
return {k: v.get_data() for k, v in proxies.items()}
def get_cut_size(data_path):
list_D_s, list_D_e, list_H_s, list_H_e, list_W_s, list_W_e = [], [], [], [], [], []
for i in range(1, 10+1):
subject_name = 'subject-%d-' % i
f = os.path.join(data_path, subject_name+'T1.hdr')
img = nib.load(f)
inputs_tmp = img.get_data()
D_s, D_e, H_s, H_e, W_s, W_e = cut_edge(inputs_tmp, margin) # set how many zero margins to keep
list_D_s.append(D_s)
list_D_e.append(D_e)
list_H_s.append(H_s)
list_H_e.append(H_e)
list_W_s.append(W_s)
list_W_e.append(W_e)
min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e = \
min(list_D_s), max(list_D_e), min(list_H_s), max(list_H_e), min(list_W_s), max(list_W_e)
print("new size: ", min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e)
return min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e
def check_atlas(atlas):
"""Validation of atlas input."""
# when its a name for pre-defined atlas
if isinstance(atlas, str):
if not pexists(atlas): # just a name
atlas = atlas.lower()
if atlas not in parcellate.atlas_list:
raise ValueError('Invalid choice of atlas. Accepted : {}'.format(parcellate.atlas_list))
elif os.path.isdir(atlas): # cortical atlas in Freesurfer org
if not parcellate.check_atlas_annot_exist(atlas):
raise ValueError('Given atlas folder does not contain Freesurfer label annot files. '
'Needed : given_atlas_dir/label/?h.aparc.annot')
elif pexists(atlas): # may be a volumetric atlas?
try:
atlas = nibabel.load(atlas)
except:
traceback.print_exc()
raise ValueError('Unable to read the provided image volume. Must be a nifti 2d volume, readable by nibabel.')
else:
raise ValueError('Unable to decipher or use the given atlas.')
else:
raise NotImplementedError('Atlas must be a string, providing a name or path to Freesurfer folder or a 3D nifti volume.')
return atlas
def _add_provenance(in_file, settings, air_msk, rot_msk):
from mriqc import __version__ as version
from niworkflows.nipype.utils.filemanip import hash_infile
import nibabel as nb
import numpy as np
air_msk_size = nb.load(air_msk).get_data().astype(
np.uint8).sum()
rot_msk_size = nb.load(rot_msk).get_data().astype(
np.uint8).sum()
out_prov = {
'md5sum': hash_infile(in_file),
'version': version,
'software': 'mriqc',
'warnings': {
'small_air_mask': bool(air_msk_size < 5e5),
'large_rot_frame': bool(rot_msk_size > 500),
}
}
if settings:
out_prov['settings'] = settings
return out_prov
def _binarize(in_file, threshold=0.5, out_file=None):
import os.path as op
import numpy as np
import nibabel as nb
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == '.gz':
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath('{}_bin{}'.format(fname, ext))
nii = nb.load(in_file)
data = nii.get_data()
data[data <= threshold] = 0
data[data > 0] = 1
hdr = nii.header.copy()
hdr.set_data_dtype(np.uint8)
nb.Nifti1Image(data.astype(np.uint8), nii.affine, hdr).to_filename(
out_file)
return out_file
def image_gradient(in_file, snr, out_file=None):
"""Computes the magnitude gradient of an image using numpy"""
import os.path as op
import numpy as np
import nibabel as nb
from scipy.ndimage import gaussian_gradient_magnitude as gradient
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == '.gz':
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath('{}_grad{}'.format(fname, ext))
imnii = nb.load(in_file)
data = imnii.get_data().astype(np.float32) # pylint: disable=no-member
datamax = np.percentile(data.reshape(-1), 99.5)
data *= 100 / datamax
grad = gradient(data, 3.0)
gradmax = np.percentile(grad.reshape(-1), 99.5)
grad *= 100.
grad /= gradmax
nb.Nifti1Image(grad, imnii.get_affine(), imnii.get_header()).to_filename(out_file)
return out_file
def thresh_image(in_file, thres=0.5, out_file=None):
"""Thresholds an image"""
import os.path as op
import nibabel as nb
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == '.gz':
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath('{}_thresh{}'.format(fname, ext))
im = nb.load(in_file)
data = im.get_data()
data[data < thres] = 0
data[data > 0] = 1
nb.Nifti1Image(
data, im.get_affine(), im.get_header()).to_filename(out_file)
return out_file
def __init__(self, func, mask, seg=None, tr=None,
title=None, figsize=DINA4_LANDSCAPE):
func_nii = nb.load(func)
self.func_data = func_nii.get_data()
self.mask_data = nb.load(mask).get_data()
self.ntsteps = self.func_data.shape[-1]
self.tr = tr
if tr is None:
self.tr = func_nii.get_header().get_zooms()[-1]
if seg is None:
self.seg_data = 2 * self.mask_data
else:
self.seg_data = nb.load(seg).get_data()
self.fig = plt.figure(figsize=figsize)
if title is not None:
self.fig.suptitle(title, fontsize=20)
self.confounds = []
self.spikes = []
def _get_limits(nifti_file, only_plot_noise=False):
from builtins import bytes, str # pylint: disable=W0622
if isinstance(nifti_file, (str, bytes)):
nii = nb.as_closest_canonical(nb.load(nifti_file))
data = nii.get_data()
else:
data = nifti_file
data_mask = np.logical_not(np.isnan(data))
if only_plot_noise:
data_mask = np.logical_and(data_mask, data != 0)
vmin = np.percentile(data[data_mask], 0)
vmax = np.percentile(data[data_mask], 61)
else:
vmin = np.percentile(data[data_mask], 0.5)
vmax = np.percentile(data[data_mask], 99.5)
return vmin, vmax
def symmetrise_axial(self, filename_in, filename_out=None, axis='x', plane_intercept=10,
side_to_copy='below', keep_in_data_dimensions=True):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im_segm = nib.load(pfi_in)
data_labels = im_segm.get_data()
data_symmetrised = symmetrise_data(data_labels,
axis=axis,
plane_intercept=plane_intercept,
side_to_copy=side_to_copy,
keep_in_data_dimensions=keep_in_data_dimensions)
im_symmetrised = set_new_data(im_segm, data_symmetrised)
nib.save(im_symmetrised, pfi_out)
print('Symmetrised axis {0}, plane_intercept {1}, image of {2} saved in {3}.'.format(axis, plane_intercept, pfi_in, pfi_out))
return pfi_out
def modify_affine(self, filename_in, affine_in, filename_out, q_form=True, s_form=True,
multiplication_side='left'):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
if isinstance(affine_in, str):
if affine_in.endswith('.txt'):
aff = np.loadtxt(connect_path_tail_head(self.pfo_in, affine_in))
else:
aff = np.load(connect_path_tail_head(self.pfo_in, affine_in))
elif isinstance(affine_in, np.ndarray):
aff = affine_in
else:
raise IOError('parameter affine_in can be path to an affine matrix .txt or .npy or the numpy array'
'corresponding to the affine transformation.')
im = nib.load(pfi_in)
new_im = modify_affine_transformation(im, aff, q_form=q_form, s_form=s_form,
multiplication_side=multiplication_side)
nib.save(new_im, pfi_out)
def apply_small_rotation(self, filename_in, filename_out, angle=np.pi/6, principal_axis='pitch'):
if isinstance(angle, list):
assert isinstance(principal_axis, list)
assert len(principal_axis) == len(angle)
new_aff = np.identity(4)
for pa, an in zip(principal_axis, angle):
aff = get_small_orthogonal_rotation(theta=an, principal_axis=pa)
new_aff = new_aff.dot(aff)
else:
new_aff = get_small_orthogonal_rotation(theta=angle, principal_axis=principal_axis)
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im = nib.load(pfi_in)
new_im = modify_affine_transformation(im_input=im, new_aff=new_aff, q_form=True, s_form=True,
multiplication_side='right')
nib.save(new_im, pfi_out)
def modify_translational_part(self, filename_in, filename_out, new_translation):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im = nib.load(pfi_in)
if isinstance(new_translation, str):
if new_translation.endswith('.txt'):
tr = np.loadtxt(connect_path_tail_head(self.pfo_in, new_translation))
else:
tr = np.load(connect_path_tail_head(self.pfo_in, new_translation))
elif isinstance(new_translation, np.ndarray):
tr = new_translation
elif isinstance(new_translation, list):
tr = np.array(new_translation)
else:
raise IOError('parameter new_translation can be path to an affine matrix .txt or .npy or the numpy array'
'corresponding to the new intended translational part.')
new_im = replace_translational_part(im, tr)
nib.save(new_im, pfi_out)
def values_below_labels(self, segmentation_filename, anatomy_filename, labels=None):
"""
:param segmentation_filename:
:param anatomy_filename:
:param labels:
:return: pandas series with label names and corresponding vectors of labels values
"""
pfi_anat = connect_path_tail_head(self.pfo_in, anatomy_filename)
pfi_segm = connect_path_tail_head(self.pfo_in, segmentation_filename)
assert os.path.exists(pfi_anat)
im_anat = nib.load(pfi_anat)
assert os.path.exists(pfi_segm)
im_segm = nib.load(pfi_segm)
labels_list, labels_names = labels_query(labels, segmentation_array=im_segm.get_data())
labels_values = get_values_below_labels(im_segm, im_anat, labels_list)
return pa.Series(labels_values, index=labels_names)
def global_dist(self, segm_1_filename, segm_2_filename, where_to_save=None,
global_metrics=(global_outline_error, global_dice_score)):
pfi_segm1 = connect_path_tail_head(self.pfo_in, segm_1_filename)
pfi_segm2 = connect_path_tail_head(self.pfo_in, segm_2_filename)
assert os.path.exists(pfi_segm1), pfi_segm1
assert os.path.exists(pfi_segm2), pfi_segm2
if self.verbose > 0:
print("\nGlobal distances between segmentations: \n -> {0} \n -> {1} "
"\nComputations started!".format(pfi_segm1, pfi_segm2))
im_segm1 = nib.load(pfi_segm1)
im_segm2 = nib.load(pfi_segm2)
se_global_distances = pa.Series(np.array([d(im_segm1, im_segm2) for d in global_metrics]),
index=[d.__name__ for d in global_metrics])
if where_to_save is not None:
where_to_save = connect_path_tail_head(self.pfo_out, where_to_save)
se_global_distances.to_pickle(where_to_save)
return se_global_distances
def relabel(self, pfi_input, pfi_output=None, list_old_labels=(), list_new_labels=()):
"""
Masks of :func:`labels_manager.tools.manipulations.relabeller.relabeller` using filename
"""
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in,
self.pfo_out)
im_labels = nib.load(pfi_in)
data_labels = im_labels.get_data()
data_relabelled = relabeller(data_labels, list_old_labels=list_old_labels,
list_new_labels=list_new_labels)
im_relabelled = set_new_data(im_labels, data_relabelled)
nib.save(im_relabelled, pfi_out)
print('Relabelled image {0} saved in {1}.'.format(pfi_in, pfi_out))
return pfi_out
def toJSON(stats, seg_file, structure_map):
"""Combine stats files to a single JSON file"""
import json
import os
import nibabel as nb
import numpy as np
img = nb.load(seg_file)
data = img.get_data()
voxel2vol = np.prod(img.header.get_zooms())
idx = np.unique(data)
reverse_map = {k:v for v, k in structure_map}
out_dict = dict(zip([reverse_map[val] for val in idx], np.bincount(data.flatten())[idx]))
for key in out_dict.keys():
out_dict[key] = [out_dict[key], voxel2vol * out_dict[key]]
mapper = dict([(0, 'csf'), (1, 'gray'), (2, 'white')])
out_dict.update(**{mapper[idx]: val for idx, val in enumerate(stats)})
out_file = 'segstats.json'
with open(out_file, 'wt') as fp:
json.dump(out_dict, fp, sort_keys=True, indent=4, separators=(',', ': '))
return os.path.abspath(out_file)
def load_inference(base_path='./data/Test/Test_Subject', nii_index=0):
"""Load nii data, whose name is, for example, 'Test_Subject01.nii'.
Arguments:
nii_index: counts from 0.
"""
filename = base_path + str(nii_index + 1).zfill(2) + '.nii'
xs = nib.load(filename).get_data()
# Crop black region to reduce nii volumes.
dummy_ys = np.zeros_like(xs)
xs, *_ = _banish_darkness(xs, dummy_ys)
# Normalize images.
local_max = np.max(xs, axis=(1, 2), keepdims=True)
local_min = np.min(xs, axis=(1, 2), keepdims=True)
xs = (xs - local_min) / (local_max - local_min)
return xs[None, ..., None]
def select_training_voxels(input_masks, threshold=2, datatype=np.float32):
"""
Select voxels for training based on a intensity threshold
Inputs:
- input_masks: list containing all subject image paths for a single modality
- threshold: minimum threshold to apply (after normalizing images with 0 mean and 1 std)
Output:
- rois: list where each element contains the subject binary mask for selected voxels [len(x), len(y), len(z)]
"""
# load images and normalize their intensities
images = [load_nii(image_name).get_data() for image_name in input_masks]
images_norm = [(im.astype(dtype=datatype) - im[np.nonzero(im)].mean()) / im[np.nonzero(im)].std() for im in images]
# select voxels with intensity higher than threshold
rois = [image > threshold for image in images_norm]
return rois
def select_voxels_from_previous_model(model, train_x_data, options):
"""
Select training voxels from image segmentation masks
"""
# get_scan names and number of modalities used
scans = train_x_data.keys()
modalities = train_x_data[scans[0]].keys()
# select voxels for training. Discard CSF and darker WM in FLAIR.
# flair_scans = [train_x_data[s]['FLAIR'] for s in scans]
# selected_voxels = select_training_voxels(flair_scans, options['min_th'])
# evaluate training scans using the learned model and extract voxels with probability higher than 0.5
seg_mask = [test_scan(model, dict(train_x_data.items()[s:s+1]), options, save_nifti = False) > 0.5 for s in range(len(scans))]
# check candidate segmentations:
# if no voxels have been selected, return candidate voxels on FLAIR modality > 2
flair_scans = [train_x_data[s]['FLAIR'] for s in scans]
images = [load_nii(name).get_data() for name in flair_scans]
images_norm = [(im.astype(dtype=np.float32) - im[np.nonzero(im)].mean()) / im[np.nonzero(im)].std() for im in images]
seg_mask = [im > 2 if np.sum(seg) == 0 else seg for im, seg in zip(images_norm, seg_mask)]
return seg_mask
def default_file_reader(x):
def pil_loader(path):
return Image.open(path).convert('RGB')
def npy_loader(path):
return np.load(path)
def nifti_loader(path):
return nibabel.load(path).get_data()
if isinstance(x, str):
if x.endswith('.npy'):
x = npy_loader(x)
elif x.endsiwth('.nii.gz'):
x = nifti_loader(x)
else:
try:
x = pil_loader(x)
except:
raise ValueError('File Format is not supported')
#else:
#raise ValueError('x should be string, but got %s' % type(x))
return x
def create_3D_distance_matrix(vox_ijk, epi_fname):
"""Compute distance between voxels in the volume.
Parameters
----------
vox_ijk : n x 3 array
Indices of voxels included in the ROI.
epi_fname : file path
Path to image defining the volume space.
Returns
-------
dmat : array
Dense square distance matrix.
"""
aff = nib.load(epi_fname).affine
vox_ras = nib.affines.apply_affine(aff, vox_ijk)
dmat = squareform(pdist(vox_ras))
return dmat
def extract_data(exp, subj, roi_info):
"""Extract timeseries data from each ROI."""
ts_data = {roi: [] for roi in roi_info}
n_runs = dict(dots=12, sticks=12, rest=8)[exp]
anal_dir = PROJECT["analysis_dir"]
ts_temp = op.join(anal_dir, exp, subj, "reg", "epi", "unsmoothed",
"run_{}", "res4d_xfm.nii.gz")
# Extract ROI data from each run, loading images only once
for run in range(1, n_runs + 1):
run_data = nib.load(ts_temp.format(run)).get_data()
for roi, info in roi_info.iteritems():
roi_ts = extract_from_volume(run_data, info["vox_ijk"])
ts_data[roi].append(roi_ts)
# Combine across runs
ts_data = {roi: np.hstack(ts_data[roi]).T for roi in roi_info}
for roi in roi_info:
assert ts_data[roi].shape[1] == len(roi_info[roi]["vox_ijk"])
return ts_data
def create_histo(folder_path, min_val, max_val, num_bins=None):
if num_bins is None:
num_bins = int(max_val - min_val)
print num_bins
histogram = {}
for f in os.listdir(folder_path):
im = nib.load(os.path.join(folder_path, f))
data = im.get_data()
hist, bins = np.histogram(data, num_bins, range=(min_val, max_val))
count = 0
for el in bins[0:-1]:
try:
histogram[el] += hist[count]
except:
histogram[el] = hist[count]
count += 1
return histogram.items()
def reg_dti_pngs(dti, loc, atlas, outdir):
"""
outdir: directory where output png file is saved
fname: name of output file WITHOUT FULL PATH. Path provided in outdir.
"""
atlas_data = nb.load(atlas).get_data()
dti_data = nb.load(dti).get_data()
b0_data = dti_data[:,:,:,loc]
cmap1 = LinearSegmentedColormap.from_list('mycmap1', ['black', 'magenta'])
cmap2 = LinearSegmentedColormap.from_list('mycmap2', ['black', 'green'])
fig = plot_overlays(atlas_data, b0_data, (cmap1, cmap2))
# name and save the file
fname = os.path.split(dti)[1].split(".")[0] + '.png'
plt.savefig(outdir + '/' + fname, format='png')
def tensor2fa(tensors, tensor_name, dti, derivdir, qcdir):
'''
outdir: location of output directory.
fname: name of output fa map file. default is none (name created based on
input file)
'''
dti_data = nb.load(dti)
affine = dti_data.get_affine()
dti_data = dti_data.get_data()
# create FA map
FA = fractional_anisotropy(tensors.evals)
FA[np.isnan(FA)] = 0
# generate the RGB FA map
FA = np.clip(FA, 0, 1)
RGB = color_fa(FA, tensors.evecs)
fname = os.path.split(tensor_name)[1].split(".")[0] + '_fa_rgb.nii.gz'
fa = nb.Nifti1Image(np.array(255 * RGB, 'uint8'), affine)
nb.save(fa, derivdir + fname)
fa_pngs(fa, fname, qcdir)
def resample(self, base, ingested, template):
"""
Resamples the image such that images which have already been aligned
in real coordinates also overlap in the image/voxel space.
**Positional Arguments**
base:
- Image to be aligned
ingested:
- Name of image after alignment
template:
- Image that is the target of the alignment
"""
# Loads images
template_im = nb.load(template)
base_im = nb.load(base)
# Aligns images
target_im = nl.resample_img(base_im,
target_affine=template_im.get_affine(),
target_shape=template_im.get_data().shape,
interpolation="nearest")
# Saves new image
nb.save(target_im, ingested)
pass
def _3d_in_file(in_file):
''' if self.inputs.in_file is 3d, return it.
if 4d, pick an arbitrary volume and return that.
if in_file is a list of files, return an arbitrary file from
the list, and an arbitrary volume from that file
'''
in_file = filemanip.filename_to_list(in_file)[0]
try:
in_file = nb.load(in_file)
except AttributeError:
in_file = in_file
if in_file.get_data().ndim == 3:
return in_file
return nlimage.index_img(in_file, 0)
def test_ROIsPlot(oasis_dir):
""" the BET report capable test """
import nibabel as nb
import numpy as np
labels = nb.load(os.path.join(oasis_dir, 'T_template0_glm_4labelsJointFusion.nii.gz'))
data = labels.get_data()
out_files = []
ldata = np.zeros_like(data)
for i, l in enumerate([1, 3, 4, 2]):
ldata[data == l] = 1
out_files.append(os.path.abspath('label%d.nii.gz' % i))
lfile = nb.Nifti1Image(ldata, labels.affine, labels.header)
lfile.to_filename(out_files[-1])
roi_rpt = ROIsPlot(
generate_report=True,
in_file=os.path.join(oasis_dir, 'T_template0.nii.gz'),
in_mask=out_files[-1],
in_rois=out_files[:-1],
colors=['g', 'y']
)
_smoke_test_report(roi_rpt, 'testROIsPlot.svg')
def _copyxform(ref_image, out_image, message=None):
# Read in reference and output
resampled = nb.load(out_image)
orig = nb.load(ref_image)
if not np.allclose(orig.affine, resampled.affine):
LOG.debug(
'Affines of input and reference images do not match, '
'FMRIPREP will set the reference image headers. '
'Please, check that the x-form matrices of the input dataset'
'are correct and manually verify the alignment of results.')
# Copy xform infos
qform, qform_code = orig.header.get_qform(coded=True)
sform, sform_code = orig.header.get_sform(coded=True)
header = resampled.header.copy()
header.set_qform(qform, int(qform_code))
header.set_sform(sform, int(sform_code))
header['descrip'] = 'xform matrices modified by %s.' % (message or '(unknown)')
newimg = resampled.__class__(resampled.get_data(), orig.affine, header)
newimg.to_filename(out_image)
def getSlices(self,paths):
image,truth = paths
image = nib.load(image).get_data(); truth = nib.load(truth).get_data()
slicesWithValues = [unique(s) for s in where(truth>0)]
sliceAxis = argmin([len(s) for s in slicesWithValues])
slicesWithValues = slicesWithValues[sliceAxis]
slc = repeat(-1,3); slc[sliceAxis] = slicesWithValues[0]
if not self.padding is None:
image, truth = [padImage(im,self.padding) for im in (image[slc][0],truth[slc][0])]
else:
image, truth = (image[slc][0],truth[slc][0])
return (image,truth)
def getSlices(self,paths):
image,truth = paths
image = nib.load(image).get_data(); truth = nib.load(truth).get_data()
slicesWithValues = [unique(s) for s in where(truth>0)]
sliceAxis = argmin([len(s) for s in slicesWithValues])
slicesWithValues = slicesWithValues[sliceAxis]
slc = repeat(-1,3); slc[sliceAxis] = slicesWithValues[0]
if not self.padding is None:
image, truth = [padImage(im,self.padding) for im in (image[slc][0],truth[slc][0])]
else:
image, truth = (image[slc][0],truth[slc][0])
return (image,truth)
def preprocess(inputfile, outputfile, order=0, df=None, input_key=None, output_key=None):
img = nib.load(inputfile)
data = img.get_data()
affine = img.affine
zoom = img.header.get_zooms()[:3]
data, affine = reslice(data, affine, zoom, (1., 1., 1.), order)
data = np.squeeze(data)
data = np.pad(data, [(0, 256 - len_) for len_ in data.shape], "constant")
if order == 0:
if df is not None:
tmp = np.zeros_like(data)
for target, source in zip(df[output_key], df[input_key]):
tmp[np.where(data == source)] = target
data = tmp
data = np.int32(data)
assert data.ndim == 3, data.ndim
else:
data_sub = data - gaussian_filter(data, sigma=1)
img = sitk.GetImageFromArray(np.copy(data_sub))
img = sitk.AdaptiveHistogramEqualization(img)
data_clahe = sitk.GetArrayFromImage(img)[:, :, :, None]
data = np.concatenate((data_clahe, data[:, :, :, None]), 3)
data = (data - np.mean(data, (0, 1, 2))) / np.std(data, (0, 1, 2))
assert data.ndim == 4, data.ndim
assert np.allclose(np.mean(data, (0, 1, 2)), 0.), np.mean(data, (0, 1, 2))
assert np.allclose(np.std(data, (0, 1, 2)), 1.), np.std(data, (0, 1, 2))
data = np.float32(data)
img = nib.Nifti1Image(data, affine)
nib.save(img, outputfile)