Python nibabel 模块,save() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nibabel.save()。
def load_ROI_mask(self):
proxy = nib.load(self.FLAIR_FILE)
image_array = np.asarray(proxy.dataobj)
mask = np.ones_like(image_array)
mask[np.where(image_array < 90)] = 0
# img = nib.Nifti1Image(mask, proxy.affine)
# nib.save(img, join(modalities_path,'mask.nii.gz'))
struct_element_size = (20, 20, 20)
mask_augmented = np.pad(mask, [(21, 21), (21, 21), (21, 21)], 'constant', constant_values=(0, 0))
mask_augmented = binary_closing(mask_augmented, structure=np.ones(struct_element_size, dtype=bool)).astype(
np.int)
return mask_augmented[21:-21, 21:-21, 21:-21].astype('bool')
def export_data(prediction_dir, nii_image_dir, tfrecords_dir, export_dir, transformation=None):
for file_path in os.listdir(prediction_dir):
name, prediction, probability = read_prediction_file(os.path.join(prediction_dir, file_path))
if transformation:
image, ground_truth = get_original_image(os.path.join(tfrecords_dir, name + '.tfrecord'), False)
prediction = transformation.transform_image(prediction, probability, image)
# build cv_predictions .nii image
img = nib.Nifti1Image(prediction, np.eye(4))
img.set_data_dtype(dtype=np.uint8)
path = os.path.join(nii_image_dir, name)
adc_name = next(l for l in os.listdir(path) if 'MR_ADC' in l)
export_image = nib.load(os.path.join(nii_image_dir, name, adc_name, adc_name + '.nii'))
i = export_image.get_data()
i[:] = img.get_data()
# set name to specification and export
_id = next(l for l in os.listdir(path) if 'MR_MTT' in l).split('.')[-1]
export_path = os.path.join(export_dir, 'SMIR.' + name + '.' + _id + '.nii')
nib.save(export_image, os.path.join(export_path))
def batch_works(k):
if k == n_processes - 1:
paths = all_paths[k * int(len(all_paths) / n_processes) : ]
else:
paths = all_paths[k * int(len(all_paths) / n_processes) : (k + 1) * int(len(all_paths) / n_processes)]
for path in paths:
probs = np.load(os.path.join(input_path, path))
pred = np.argmax(probs, axis=3)
fg_prob = 1 - probs[..., 0]
pred = clean_contour(fg_prob, pred)
seg = np.zeros(pred.shape, dtype=np.int16)
seg[pred == 1] = 1
seg[pred == 2] = 2
seg[pred == 3] = 4
img = nib.Nifti1Image(seg, np.eye(4))
nib.save(img, os.path.join(output_path, path.replace('_probs.npy', '.nii.gz')))
def dicom2nifti(folders,outdir=None,extension=None):
'''dicom2nifti will take a list of folders and produce nifti files
in an output directory. If not defined, they will be output in their
original directory.
'''
if isinstance(folders,dict):
folders = list(folders.keys())
if not isinstance(folders,list):
folders = [folders]
outfiles = []
for folder in folders:
lookup = find_dicoms(folder,extension)
for base,dicomlist in lookup.items():
nii = read_series(dicomlist)
if outdir != None:
outfile = "%s/%s.nii.gz" %(outdir,os.path.basename(base))
else:
outfile = "%s/%s.nii.gz" %(base,os.path.basename(base))
bot.info("Saving %s" %outfile)
nibabel.save(nii,outfile)
outfiles.append(outfile)
return outfiles
def symmetrise_axial(self, filename_in, filename_out=None, axis='x', plane_intercept=10,
side_to_copy='below', keep_in_data_dimensions=True):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im_segm = nib.load(pfi_in)
data_labels = im_segm.get_data()
data_symmetrised = symmetrise_data(data_labels,
axis=axis,
plane_intercept=plane_intercept,
side_to_copy=side_to_copy,
keep_in_data_dimensions=keep_in_data_dimensions)
im_symmetrised = set_new_data(im_segm, data_symmetrised)
nib.save(im_symmetrised, pfi_out)
print('Symmetrised axis {0}, plane_intercept {1}, image of {2} saved in {3}.'.format(axis, plane_intercept, pfi_in, pfi_out))
return pfi_out
def modify_affine(self, filename_in, affine_in, filename_out, q_form=True, s_form=True,
multiplication_side='left'):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
if isinstance(affine_in, str):
if affine_in.endswith('.txt'):
aff = np.loadtxt(connect_path_tail_head(self.pfo_in, affine_in))
else:
aff = np.load(connect_path_tail_head(self.pfo_in, affine_in))
elif isinstance(affine_in, np.ndarray):
aff = affine_in
else:
raise IOError('parameter affine_in can be path to an affine matrix .txt or .npy or the numpy array'
'corresponding to the affine transformation.')
im = nib.load(pfi_in)
new_im = modify_affine_transformation(im, aff, q_form=q_form, s_form=s_form,
multiplication_side=multiplication_side)
nib.save(new_im, pfi_out)
def apply_small_rotation(self, filename_in, filename_out, angle=np.pi/6, principal_axis='pitch'):
if isinstance(angle, list):
assert isinstance(principal_axis, list)
assert len(principal_axis) == len(angle)
new_aff = np.identity(4)
for pa, an in zip(principal_axis, angle):
aff = get_small_orthogonal_rotation(theta=an, principal_axis=pa)
new_aff = new_aff.dot(aff)
else:
new_aff = get_small_orthogonal_rotation(theta=angle, principal_axis=principal_axis)
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im = nib.load(pfi_in)
new_im = modify_affine_transformation(im_input=im, new_aff=new_aff, q_form=True, s_form=True,
multiplication_side='right')
nib.save(new_im, pfi_out)
def modify_translational_part(self, filename_in, filename_out, new_translation):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im = nib.load(pfi_in)
if isinstance(new_translation, str):
if new_translation.endswith('.txt'):
tr = np.loadtxt(connect_path_tail_head(self.pfo_in, new_translation))
else:
tr = np.load(connect_path_tail_head(self.pfo_in, new_translation))
elif isinstance(new_translation, np.ndarray):
tr = new_translation
elif isinstance(new_translation, list):
tr = np.array(new_translation)
else:
raise IOError('parameter new_translation can be path to an affine matrix .txt or .npy or the numpy array'
'corresponding to the new intended translational part.')
new_im = replace_translational_part(im, tr)
nib.save(new_im, pfi_out)
def relabel(self, pfi_input, pfi_output=None, list_old_labels=(), list_new_labels=()):
"""
Masks of :func:`labels_manager.tools.manipulations.relabeller.relabeller` using filename
"""
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in,
self.pfo_out)
im_labels = nib.load(pfi_in)
data_labels = im_labels.get_data()
data_relabelled = relabeller(data_labels, list_old_labels=list_old_labels,
list_new_labels=list_new_labels)
im_relabelled = set_new_data(im_labels, data_relabelled)
nib.save(im_relabelled, pfi_out)
print('Relabelled image {0} saved in {1}.'.format(pfi_in, pfi_out))
return pfi_out
def tensor2fa(tensors, tensor_name, dti, derivdir, qcdir):
'''
outdir: location of output directory.
fname: name of output fa map file. default is none (name created based on
input file)
'''
dti_data = nb.load(dti)
affine = dti_data.get_affine()
dti_data = dti_data.get_data()
# create FA map
FA = fractional_anisotropy(tensors.evals)
FA[np.isnan(FA)] = 0
# generate the RGB FA map
FA = np.clip(FA, 0, 1)
RGB = color_fa(FA, tensors.evecs)
fname = os.path.split(tensor_name)[1].split(".")[0] + '_fa_rgb.nii.gz'
fa = nb.Nifti1Image(np.array(255 * RGB, 'uint8'), affine)
nb.save(fa, derivdir + fname)
fa_pngs(fa, fname, qcdir)
def resample(self, base, ingested, template):
"""
Resamples the image such that images which have already been aligned
in real coordinates also overlap in the image/voxel space.
**Positional Arguments**
base:
- Image to be aligned
ingested:
- Name of image after alignment
template:
- Image that is the target of the alignment
"""
# Loads images
template_im = nb.load(template)
base_im = nb.load(base)
# Aligns images
target_im = nl.resample_img(base_im,
target_affine=template_im.get_affine(),
target_shape=template_im.get_data().shape,
interpolation="nearest")
# Saves new image
nb.save(target_im, ingested)
pass
def test_register_dwi():
fdata, fbval, fbvec = dpd.get_data('small_64D')
with nbtmp.InTemporaryDirectory() as tmpdir:
# Use an abbreviated data-set:
img = nib.load(fdata)
data = img.get_data()[..., :10]
nib.save(nib.Nifti1Image(data, img.affine),
op.join(tmpdir, 'data.nii.gz'))
# Convert from npy to txt:
bvals = np.load(fbval)
bvecs = np.load(fbvec)
np.savetxt(op.join(tmpdir, 'bvals.txt'), bvals[:10])
np.savetxt(op.join(tmpdir, 'bvecs.txt'), bvecs[:10])
reg_file = register_dwi(op.join(tmpdir, 'data.nii.gz'),
op.join(tmpdir, 'bvals.txt'),
op.join(tmpdir, 'bvecs.txt'))
npt.assert_(op.exists(reg_file))
def make_dti_data(out_fbval, out_fbvec, out_fdata, out_shape=(5, 6, 7)):
"""
Create a synthetic data-set with a single shell acquisition
out_fbval, out_fbvec, out_fdata : str
Full paths to generated data and bval/bvec files
out_shape : tuple
The 3D shape of the output volum
"""
fimg, fbvals, fbvecs = dpd.get_data('small_64D')
img = nib.load(fimg)
bvals, bvecs = dio.read_bvals_bvecs(fbvals, fbvecs)
gtab = dpg.gradient_table(bvals, bvecs)
# Simulate a signal based on the DTI model:
signal = single_tensor(gtab, S0=100)
DWI = np.zeros(out_shape + (len(gtab.bvals), ))
DWI[:] = signal
nib.save(nib.Nifti1Image(DWI, img.affine), out_fdata)
np.savetxt(out_fbval, bvals)
np.savetxt(out_fbvec, bvecs)
def exportNyp(self, event):
"""Export histogram counts as a numpy array."""
outFileName = self.basename + '_identifier' \
+ '_pcMax' + str(self.initTpl[0]) \
+ '_pcMin' + str(self.initTpl[1]) \
+ '_sc' + str(int(self.initTpl[2]))
if self.segmType == 'ncut':
outFileName = outFileName.replace('identifier', 'volHistLabels')
outFileName = outFileName.replace('.', 'pt')
np.save(outFileName, self.volHistMask)
print "successfully exported histogram colors as: \n" + \
outFileName
elif self.segmType == 'main':
outFileName = outFileName.replace('identifier', 'volHist')
outFileName = outFileName.replace('.', 'pt')
np.save(outFileName, self.counts)
print "successfully exported histogram counts as: \n" + \
outFileName
else:
return
def preprocess(inputfile, outputfile, order=0, df=None, input_key=None, output_key=None):
img = nib.load(inputfile)
data = img.get_data()
affine = img.affine
zoom = img.header.get_zooms()[:3]
data, affine = reslice(data, affine, zoom, (1., 1., 1.), order)
data = np.squeeze(data)
data = np.pad(data, [(0, 256 - len_) for len_ in data.shape], "constant")
if order == 0:
if df is not None:
tmp = np.zeros_like(data)
for target, source in zip(df[output_key], df[input_key]):
tmp[np.where(data == source)] = target
data = tmp
data = np.int32(data)
assert data.ndim == 3, data.ndim
else:
data_sub = data - gaussian_filter(data, sigma=1)
img = sitk.GetImageFromArray(np.copy(data_sub))
img = sitk.AdaptiveHistogramEqualization(img)
data_clahe = sitk.GetArrayFromImage(img)[:, :, :, None]
data = np.concatenate((data_clahe, data[:, :, :, None]), 3)
data = (data - np.mean(data, (0, 1, 2))) / np.std(data, (0, 1, 2))
assert data.ndim == 4, data.ndim
assert np.allclose(np.mean(data, (0, 1, 2)), 0.), np.mean(data, (0, 1, 2))
assert np.allclose(np.std(data, (0, 1, 2)), 1.), np.std(data, (0, 1, 2))
data = np.float32(data)
img = nib.Nifti1Image(data, affine)
nib.save(img, outputfile)
def roi_from_bbox(
input_file,
bbox,
output_file):
""" Create a ROI image from a bounding box.
Parameters
----------
input_file: str
the reference image where the bbox is defined.
bbox: 6-uplet
the corner of the bbox in voxel coordinates: xmin, xmax, ymin, ymax,
zmin, zmax.
output_file: str
the desired ROI image file.
"""
# Load the reference image and generate a grid
im = nibabel.load(input_file)
xv, yv, zv = numpy.meshgrid(
numpy.linspace(0, im.shape[0] - 1, im.shape[0]),
numpy.linspace(0, im.shape[1] - 1, im.shape[1]),
numpy.linspace(0, im.shape[2] - 1, im.shape[2]))
xv = xv.astype(int)
yv = yv.astype(int)
zv = zv.astype(int)
# Intersect the grid with the bbox
xa = numpy.bitwise_and(xv >= bbox[0], xv <= bbox[1])
ya = numpy.bitwise_and(yv >= bbox[2], yv <= bbox[3])
za = numpy.bitwise_and(zv >= bbox[4], zv <= bbox[5])
# Generate bbox indices
indices = numpy.bitwise_and(numpy.bitwise_and(xa, ya), za)
# Generate/save ROI
roi = numpy.zeros(im.shape, dtype=int)
roi[xv[indices].tolist(), yv[indices].tolist(), zv[indices].tolist()] = 1
roi_im = nibabel.Nifti1Image(roi, affine=im.get_affine())
nibabel.save(roi_im, output_file)
def merge_fibers(tractograms, tempdir=None):
""" Merge tractograms.
Parameters
----------
tractograms: list of str
paths to the input tractograms.
tempdir: str, default None
a temporary directory to store intermediate tractogram.
Returns
-------
merge_tractogram_file: str
all the streamlines in one TRK file.
"""
# Check existence of input file
for path in tractograms:
if not os.path.isfile(path):
raise ValueError("File does not exist: {0}.".format(path))
# Create a temporary directory to store an intermediate tractogram
tempdir = tempfile.mkdtemp(prefix="tractconverter_", dir=tempdir)
# Combine tractograms in one file
trk = nibabel.streamlines.load(tractograms[0])
for trk_path in tractograms[1:]:
part_trk = nibabel.streamlines.load(trk_path)
trk.streamlines.extend(part_trk.streamlines)
merge_tractogram_file = os.path.join(tempdir, "tmp.trk")
trk.save(merge_tractogram_file)
return merge_tractogram_file
def extract_image(in_file, index, out_file=None):
""" Extract the image at 'index' position.
Parameters
----------
in_file: str (mandatory)
the input image.
index: int (mandatory)
the index of last image dimention to extract.
out_file: str (optional, default None)
the name of the extracted image file.
Returns
-------
out_file: str
the name of the extracted image file.
"""
# Set default output if necessary
dirname = os.path.dirname(in_file)
basename = os.path.basename(in_file).split(".")[0]
if out_file is None:
out_file = os.path.join(
dirname, "extract{0}_{1}.nii.gz".format(index, basename))
# Extract the image of interest
image = nibabel.load(in_file)
affine = image.get_affine()
extracted_array = image.get_data()[..., index]
extracted_image = nibabel.Nifti1Image(extracted_array, affine)
nibabel.save(extracted_image, out_file)
return out_file
def saveImageAsNifti(imageToSave,
imageName,
imageOriginalName,
imageType):
printFileNames = False
if printFileNames == True:
print(" ... Saving image in {}".format(imageName))
[imageData,img_proxy] = load_nii(imageOriginalName, printFileNames)
# Generate the nii file
niiToSave = nib.Nifti1Image(imageToSave, img_proxy.affine)
niiToSave.set_data_dtype(imageType)
dim = len(imageToSave.shape)
zooms = list(img_proxy.header.get_zooms()[:dim])
if len(zooms) < dim :
zooms = zooms + [1.0]*(dim-len(zooms))
niiToSave.header.set_zooms(zooms)
nib.save(niiToSave, imageName)
print "... Image succesfully saved..."
## ========= For Matlab files ========== ##
def save(self, image, outpath):
""" A method that save the image data and associated metadata.
Parameters
----------
image: Image
the image to be saved.
outpath: str
the path where the the image will be saved.
"""
diag = (1. / image.spacing).tolist() + [1]
_image = nibabel.Nifti1Image(image.data, numpy.diag(diag))
nibabel.save(_image, outpath)
def decompose_vol2cube(vol_data, batch_size, cube_size, n_chn, ita):
cube_list = []
# get parameters for decompose
fold, ovlap = fit_cube_param(vol_data.shape, cube_size, ita)
dim = np.asarray(vol_data.shape)
# decompose
for R in range(0, fold[0]):
r_s = R*cube_size - R*ovlap[0]
r_e = r_s + cube_size
if r_e >= dim[0]:
r_s = dim[0] - cube_size
r_e = r_s + cube_size
for C in range(0, fold[1]):
c_s = C*cube_size - C*ovlap[1]
c_e = c_s + cube_size
if c_e >= dim[1]:
c_s = dim[1] - cube_size
c_e = c_s + cube_size
for H in range(0, fold[2]):
h_s = H*cube_size - H*ovlap[2]
h_e = h_s + cube_size
if h_e >= dim[2]:
h_s = dim[2] - cube_size
h_e = h_s + cube_size
# partition multiple channels
cube_temp = vol_data[r_s:r_e, c_s:c_e, h_s:h_e]
cube_batch = np.zeros([batch_size, cube_size, cube_size, cube_size, n_chn]).astype('float32')
cube_batch[0, :, :, :, 0] = copy.deepcopy(cube_temp)
# save
cube_list.append(cube_batch)
return cube_list
# compose list of label cubes into a label volume
def remove_minor_cc(vol_data, rej_ratio, rename_map):
"""Remove small connected components refer to rejection ratio"""
"""Usage
# rename_map = [0, 205, 420, 500, 550, 600, 820, 850]
# nii_path = '/home/xinyang/project_xy/mmwhs2017/dataset/ct_output/test/test_4.nii'
# vol_file = nib.load(nii_path)
# vol_data = vol_file.get_data().copy()
# ref_affine = vol_file.affine
# rem_vol = remove_minor_cc(vol_data, rej_ratio=0.2, class_n=8, rename_map=rename_map)
# # save
# rem_path = 'rem_cc.nii'
# rem_vol_file = nib.Nifti1Image(rem_vol, ref_affine)
# nib.save(rem_vol_file, rem_path)
#===# possible be parallel in future
"""
rem_vol = copy.deepcopy(vol_data)
class_n = len(rename_map)
# retrieve all classes
for c in range(1, class_n):
print 'processing class %d...' % c
class_idx = (vol_data==rename_map[c])*1
class_vol = np.sum(class_idx)
labeled_cc, num_cc = measurements.label(class_idx)
# retrieve all connected components in this class
for cc in range(1, num_cc+1):
single_cc = ((labeled_cc==cc)*1)
single_vol = np.sum(single_cc)
# remove if too small
if single_vol / (class_vol*1.0) < rej_ratio:
rem_vol[labeled_cc==cc] = 0
return rem_vol
def test_adjust_nifti_replace_translational_part_F_F():
pfi_input = jph(root_dir, 'data_examples', 'acee.nii.gz')
if not os.path.exists(pfi_input):
generate_figures()
translation = [1, 2, 3]
pfi_output = jph(root_dir, 'data_examples', 'acee_new_header.nii.gz')
im_a_cee = nib.load(pfi_input)
initial_affine = im_a_cee.affine
initial_qform = im_a_cee.get_qform()
initial_sform = im_a_cee.get_sform()
new_im = replace_translational_part(im_a_cee, translation, q_form=False, s_form=False)
nib.save(new_im, pfi_output)
im_a_cee_new = nib.load(pfi_output)
final_affine = im_a_cee_new.affine
final_qform = im_a_cee_new.get_qform()
final_sform = im_a_cee_new.get_sform()
new_transformation_expected = np.copy(initial_affine)
new_transformation_expected[:3, 3] = translation
# see documentaiton http://nipy.org/nibabel/nifti_images.html#choosing-image-affine
if im_a_cee.header['sform_code'] > 0 : # affine -> sform affine
assert_array_almost_equal(initial_affine, final_affine)
assert_array_almost_equal(initial_qform, final_qform)
assert_array_almost_equal(initial_sform, final_sform)
elif im_a_cee.header['qform_code'] > 0: # affine -> qform affine
assert_array_almost_equal(initial_affine, final_affine)
assert_array_almost_equal(initial_qform, final_qform)
assert_array_almost_equal(initial_sform, final_sform)
else: # affine -> header-off affine
assert_array_almost_equal(new_transformation_expected, final_affine)
assert_array_almost_equal(initial_qform, final_qform)
assert_array_almost_equal(initial_sform, final_sform)
def test_adjust_nifti_translation_path_T_T():
pfi_input = jph(root_dir, 'data_examples', 'acee.nii.gz')
if not os.path.exists(pfi_input):
generate_figures()
translation = [1, 2, 3]
pfi_output = jph(root_dir, 'data_examples', 'acee_new_header.nii.gz')
im_a_cee = nib.load(pfi_input)
initial_affine = im_a_cee.affine # sform
initial_qform = im_a_cee.get_qform()
initial_sform = im_a_cee.get_sform()
new_im = replace_translational_part(im_a_cee, translation, q_form=True, s_form=True)
nib.save(new_im, pfi_output)
im_a_cee_new = nib.load(pfi_output)
final_affine = im_a_cee_new.affine
final_qform = im_a_cee_new.get_qform()
final_sform = im_a_cee_new.get_sform()
# all must be new
new_transformation = np.copy(initial_affine)
new_transformation[:3, 3] = translation
assert_array_almost_equal(new_transformation, final_qform)
assert_array_almost_equal(new_transformation, final_affine)
assert_array_almost_equal(new_transformation, final_sform)
# check all of the original image is still the same
im_a_cee_reloaded = nib.load(pfi_input)
reloaded_affine = im_a_cee_reloaded.affine # sform
reloaded_qform = im_a_cee_reloaded.get_qform()
reloaded_sform = im_a_cee_reloaded.get_sform()
assert_array_almost_equal(initial_affine, reloaded_affine)
assert_array_almost_equal(initial_qform, reloaded_qform)
assert_array_almost_equal(initial_sform, reloaded_sform)
def extend_slice_new_dimension(self, pfi_input, pfi_output=None, new_axis=3, num_slices=10):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
im_slice = nib.load(pfi_in)
data_slice = im_slice.get_data()
data_extended = np.stack([data_slice, ] * num_slices, axis=new_axis)
im_extended = set_new_data(im_slice, data_extended)
nib.save(im_extended, pfi_out)
print('Extended image of {0} saved in {1}.'.format(pfi_in, pfi_out))
return pfi_out
def merge_from_4d(self, pfi_input, pfi_output=None):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
im_labels_4d = nib.load(pfi_in)
data_labels_4d = im_labels_4d.get_data()
assert len(data_labels_4d.shape) == 4
data_merged_in_3d = merge_labels_from_4d(data_labels_4d)
im_merged_in_3d = set_new_data(im_labels_4d, data_merged_in_3d)
nib.save(im_merged_in_3d, pfi_out)
print('Merged labels from 4d image {0} saved in {1}.'.format(pfi_in, pfi_out))
return pfi_out
def cut_4d_volume_with_a_1_slice_mask(self, pfi_input, filename_mask, pfi_output=None):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
pfi_mask = connect_path_tail_head(self.pfo_in, filename_mask)
im_dwi = nib.load(pfi_in)
im_mask = nib.load(pfi_mask)
im_masked = cut_4d_volume_with_a_1_slice_mask_nib(im_dwi, im_mask)
nib.save(im_masked, pfi_out)
def normalise_below_label(self, pfi_input, pfi_output, filename_segm, labels, stats=np.median):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
pfi_segm = connect_path_tail_head(self.pfo_in, filename_segm)
im_input = nib.load(pfi_in)
im_segm = nib.load(pfi_segm)
labels_list, labels_names = labels_query(labels, im_segm.get_data())
im_out = normalise_below_labels(im_input, labels_list, labels, stats=stats, exclude_first_label=True)
nib.save(im_out, pfi_out)
def get_contour_from_segmentation(self, pfi_input_segmenation, pfi_output_contour, omit_axis=None, verbose=0):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input_segmenation, pfi_output_contour, self.pfo_in, self.pfo_out)
im_segm = nib.load(pfi_in)
im_contour = contour_from_segmentation(im_segm, omit_axis=omit_axis, verbose=verbose)
nib.save(im_contour, pfi_output_contour)
def erase_labels(self, pfi_input, pfi_output=None, labels_to_erase=()):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
im_labels = nib.load(pfi_in)
data_labels = im_labels.get_data()
data_erased = erase_labels(data_labels, labels_to_erase=labels_to_erase)
im_erased = set_new_data(im_labels, data_erased)
nib.save(im_erased, pfi_out)
print('Erased labels from image {0} saved in {1}.'.format(pfi_in, pfi_out))
return pfi_out
def assign_all_other_labels_the_same_value(self, pfi_input, pfi_output=None,
labels_to_keep=(), same_value_label=255):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
im_labels = nib.load(pfi_in)
data_labels = im_labels.get_data()
data_reassigned = assign_all_other_labels_the_same_value(data_labels,
labels_to_keep=labels_to_keep, same_value_label=same_value_label)
im_reassigned = set_new_data(im_labels, data_reassigned)
nib.save(im_reassigned, pfi_out)
print('Reassigned labels from image {0} saved in {1}.'.format(pfi_in, pfi_out))
return pfi_out
def keep_one_label(self, pfi_input, pfi_output=None, label_to_keep=1):
pfi_in, pfi_out = get_pfi_in_pfi_out(pfi_input, pfi_output, self.pfo_in, self.pfo_out)
im_labels = nib.load(pfi_in)
data_labels = im_labels.get_data()
data_one_label = keep_only_one_label(data_labels, label_to_keep)
im_one_label = set_new_data(im_labels, data_one_label)
nib.save(im_one_label, pfi_out)
print('Label {0} kept from image {1} and saved in {2}.'.format(label_to_keep, pfi_in, pfi_out))
return pfi_out
def apply_a_mask_path(pfi_input, pfi_mask, pfi_output):
# TODO expose in facade
"""
Adaptative - if the mask is 3D and the image is 4D, will create a temporary mask,
generate the stack of masks, and apply the stacks to the image.
:param pfi_input: path to file 3d x T image
:param pfi_mask: 3d mask same dimension as the 3d of the pfi_input
:param pfi_output: apply the mask to each time point T in the fourth dimension if any.
:return: None, save the output in pfi_output.
"""
im_input = nib.load(pfi_input)
im_mask = nib.load(pfi_mask)
assert len(im_mask.shape) == 3
if not im_mask.shape == im_input.shape[:3]:
msg = 'Mask {0} and image {1} does not have compatible dimension: {2} and {3}'.format(
pfi_input, pfi_mask, im_input, im_mask.shape)
raise IOError(msg)
if len(im_input.shape) == 3:
new_data = im_input.get_data() * im_mask.get_data().astype(np.bool)
else:
new_data = np.zeros_like(im_input.get_data())
for t in range(im_input.shape[3]):
new_data[..., t] = im_input.get_data()[..., t] * im_mask.get_data().astype(np.bool)
new_im = set_new_data(image=im_input, new_data=new_data)
nib.save(new_im, filename=pfi_output)
def save_as_nii(y, savename):
y_nii = nib.Nifti1Image(y.astype(np.uint8), np.eye(4))
nib.save(y_nii, savename + '.nii')
def save_nifti(x, output, client):
filename = x[0].split('/')[-1]
im = nib.Nifti1Image(x[1][1], x[1][0])
nib.save(im, filename)
client.upload(output,filename, overwrite=True)
return (x[0], 0)
def save_nifti(x):
filename = x[0].split('/')[-1]
im = nib.Nifti1Image(x[1][1], x[1][0])
nib.save(im, os.path.join('/run/user/1000', filename))
return filename
def load_bval_bvec_dti(self, fbval, fbvec, dti_file, dti_file_out):
"""
Takes bval and bvec files and produces a structure in dipy format
**Positional Arguments:**
"""
# Load Data
img = nb.load(dti_file)
data = img.get_data()
bvals, bvecs = read_bvals_bvecs(fbval, fbvec)
# Get rid of spurrious scans
idx = np.where((bvecs[:, 0] == 100) & (bvecs[:, 1] == 100) &
(bvecs[:, 2] == 100))
bvecs = np.delete(bvecs, idx, axis=0)
bvals = np.delete(bvals, idx, axis=0)
data = np.delete(data, idx, axis=3)
# Save corrected DTI volume
dti_new = nb.Nifti1Image(data, affine=img.get_affine(),
header=img.get_header())
dti_new.update_header()
nb.save(dti_new, dti_file_out)
gtab = gradient_table(bvals, bvecs, atol=0.01)
print(gtab.info)
return gtab
def save(self, fname):
"""Save the source spaces to a fif file
Parameters
----------
fname : str
File to write.
"""
write_source_spaces(fname, self)
def _do_src_distances(con, vertno, run_inds, limit):
"""Helper to compute source space distances in chunks"""
if limit < np.inf:
func = partial(sparse.csgraph.dijkstra, limit=limit)
else:
func = sparse.csgraph.dijkstra
chunk_size = 20 # save memory by chunking (only a little slower)
lims = np.r_[np.arange(0, len(run_inds), chunk_size), len(run_inds)]
n_chunks = len(lims) - 1
# eventually we want this in float32, so save memory by only storing 32-bit
d = np.empty((len(run_inds), len(vertno)), np.float32)
min_dist = np.empty((n_chunks, con.shape[0]))
min_idx = np.empty((n_chunks, con.shape[0]), np.int32)
range_idx = np.arange(con.shape[0])
for li, (l1, l2) in enumerate(zip(lims[:-1], lims[1:])):
idx = vertno[run_inds[l1:l2]]
out = func(con, indices=idx)
midx = np.argmin(out, axis=0)
min_idx[li] = idx[midx]
min_dist[li] = out[midx, range_idx]
d[l1:l2] = out[:, vertno]
midx = np.argmin(min_dist, axis=0)
min_dist = min_dist[midx, range_idx]
min_idx = min_idx[midx, range_idx]
d[d == np.inf] = 0 # scipy will give us np.inf for uncalc. distances
return d, min_idx, min_dist
def write_num_peaks(peaks, affine, out_name):
# also print out a volume for number peaks
# sum the values greater than 5 in the third dimension
img = nib.Nifti1Image(np.sum(peaks.peak_values > 0, 3), affine)
nib.save(img, out_name)
def main():
argParseObj = argparse.ArgumentParser(description="Load stanford hardi dataset")
# this is the whole dwi with all the volumes yo
argParseObj.add_argument('-dir', help="the path where you want"
"this stuff to be dumped",
type=str, nargs='?', required=True)
args = argParseObj.parse_args()
outputDir = args.dir
from dipy.data import read_stanford_labels
hardi_img, gtab, label_img = read_stanford_labels()
hardi_data = hardi_img.get_data()
hardi_write = nib.Nifti1Image(hardi_data, hardi_img.get_affine())
outName = ''.join([outputDir, 'stanfordHardi_dwi.nii.gz'])
nib.save(hardi_write, outName)
label_data = label_img.get_data()
label_write = nib.Nifti1Image(label_data, label_img.get_affine())
outName = ''.join([outputDir, 'stanfordHardi_fsLabels.nii.gz'])
nib.save(label_write, outName)
# from dipy.data import read_stanford_pve_maps
# csf_img , gm_img , wm_img = read_stanford_pve_maps()
from dipy.external import fsl
outName = ''.join([outputDir, 'stanfordHardi.'])
fsl.write_bvals_bvecs(gtab.bvals, gtab.bvecs, prefix=outName)
def create_dummy_preproc_path(n_subjects, n_sessions):
preproc_dir = tempfile.mkdtemp()
subjects = ['sub-%s' % (d + 1) for d in range(n_subjects)]
sessions = ['sess-%s' % (d + 1) for d in range(n_sessions)]
for subject in subjects:
for session in sessions:
for modality in ['anat', 'dwi']:
os.makedirs(op.join(preproc_dir, subject, session, modality))
# Make some dummy data:
aff = np.eye(4)
data = np.ones((10, 10, 10, 6))
bvecs = np.vstack([np.eye(3), np.eye(3)])
bvecs[0] = 0
bvals = np.ones(6) * 1000.
bvals[0] = 0
np.savetxt(op.join(preproc_dir, subject, session, 'dwi',
'dwi.bvals'),
bvals)
np.savetxt(op.join(preproc_dir, subject, session, 'dwi',
'dwi.bvecs'),
bvecs)
nib.save(nib.Nifti1Image(data, aff),
op.join(preproc_dir, subject, session, 'dwi',
'dwi.nii.gz'))
nib.save(nib.Nifti1Image(data, aff),
op.join(preproc_dir, subject, session, 'anat',
'T1w.nii.gz'))
nib.save(nib.Nifti1Image(data, aff),
op.join(preproc_dir, subject, session, 'anat',
'aparc+aseg.nii.gz'))
return preproc_dir
def make_dki_data(out_fbval, out_fbvec, out_fdata, out_shape=(5, 6, 7)):
"""
Create a synthetic data-set with a 2-shell acquisition
out_fbval, out_fbvec, out_fdata : str
Full paths to generated data and bval/bvec files
out_shape : tuple
The 3D shape of the output volum
"""
# This is one-shell (b=1000) data:
fimg, fbvals, fbvecs = dpd.get_data('small_64D')
img = nib.load(fimg)
bvals, bvecs = dio.read_bvals_bvecs(fbvals, fbvecs)
# So we create two shells out of it
bvals_2s = np.concatenate((bvals, bvals * 2), axis=0)
bvecs_2s = np.concatenate((bvecs, bvecs), axis=0)
gtab_2s = dpg.gradient_table(bvals_2s, bvecs_2s)
# Simulate a signal based on the DKI model:
mevals_cross = np.array([[0.00099, 0, 0], [0.00226, 0.00087, 0.00087],
[0.00099, 0, 0], [0.00226, 0.00087, 0.00087]])
angles_cross = [(80, 10), (80, 10), (20, 30), (20, 30)]
fie = 0.49
frac_cross = [fie * 50, (1 - fie) * 50, fie * 50, (1 - fie) * 50]
# Noise free simulates
signal_cross, dt_cross, kt_cross = multi_tensor_dki(gtab_2s, mevals_cross,
S0=100,
angles=angles_cross,
fractions=frac_cross,
snr=None)
DWI = np.zeros(out_shape + (len(gtab_2s.bvals), ))
DWI[:] = signal_cross
nib.save(nib.Nifti1Image(DWI, img.affine), out_fdata)
np.savetxt(out_fbval, bvals_2s)
np.savetxt(out_fbvec, bvecs_2s)
def predict(params_file, gtab, S0_file=None, out_dir=None):
"""
Create a signal prediction from DTI params
params_file : str
Full path to a file with parameters saved from a DKI fit
gtab : GradientTable object
The gradient table to predict for
S0_file : str
Full path to a nifti file that contains S0 measurements to incorporate
into the prediction. If the file contains 4D data, the volumes that
contain the S0 data must be the same as the gtab.b0s_mask.
"""
if out_dir is None:
out_dir = op.join(op.split(params_file)[0])
if S0_file is None:
S0 = 100
else:
S0 = nib.load(S0_file).get_data()
# If the S0 data is 4D, we assume it comes from an acquisition that had
# B0 measurements in the same volumes described in the gtab:
if len(S0.shape) == 4:
S0 = np.mean(S0[..., gtab.b0s_mask], -1)
# Otherwise, we assume that it's already a 3D volume, and do nothing
img = nib.load(params_file)
params = img.get_data()
pred = dti.tensor_prediction(params, gtab, S0=S0)
fname = op.join(out_dir, 'dti_prediction.nii.gz')
nib.save(nib.Nifti1Image(pred, img.affine), fname)
return fname
def _brain_mask(row, median_radius=4, numpass=4, autocrop=False,
vol_idx=None, dilate=None, force_recompute=False):
brain_mask_file = _get_fname(row, '_brain_mask.nii.gz')
if not op.exists(brain_mask_file) or force_recompute:
img = nib.load(row['dwi_file'])
data = img.get_data()
gtab = row['gtab']
mean_b0 = np.mean(data[..., ~gtab.b0s_mask], -1)
_, brain_mask = median_otsu(mean_b0, median_radius, numpass,
autocrop, dilate=dilate)
be_img = nib.Nifti1Image(brain_mask.astype(int),
img.affine)
nib.save(be_img, brain_mask_file)
return brain_mask_file
def _dti(row, force_recompute=False):
dti_params_file = _get_fname(row, '_dti_params.nii.gz')
if not op.exists(dti_params_file) or force_recompute:
img = nib.load(row['dwi_file'])
data = img.get_data()
gtab = row['gtab']
brain_mask_file = _brain_mask(row)
mask = nib.load(brain_mask_file).get_data()
dtf = dti_fit(gtab, data, mask=mask)
nib.save(nib.Nifti1Image(dtf.model_params, row['dwi_affine']),
dti_params_file)
return dti_params_file
def _dti_fa(row, force_recompute=False):
dti_fa_file = _get_fname(row, '_dti_fa.nii.gz')
if not op.exists(dti_fa_file) or force_recompute:
tf = _dti_fit(row)
fa = tf.fa
nib.save(nib.Nifti1Image(fa, row['dwi_affine']),
dti_fa_file)
return dti_fa_file
def _bundles(row, wm_labels, odf_model="DTI", directions="det",
force_recompute=False):
bundles_file = _get_fname(row,
'%s_%s_bundles.trk' % (odf_model,
directions))
if not op.exists(bundles_file) or force_recompute:
streamlines_file = _streamlines(row, wm_labels,
odf_model=odf_model,
directions=directions,
force_recompute=force_recompute)
tg = nib.streamlines.load(streamlines_file).tractogram
sl = tg.apply_affine(np.linalg.inv(row['dwi_affine'])).streamlines
bundle_dict = make_bundle_dict()
reg_template = dpd.read_mni_template()
mapping = reg.read_mapping(_mapping(row), row['dwi_file'],
reg_template)
bundles = seg.segment(row['dwi_file'],
row['bval_file'],
row['bvec_file'],
sl,
bundle_dict,
reg_template=reg_template,
mapping=mapping)
tgram = _tgramer(bundles, bundle_dict, row['dwi_affine'])
nib.streamlines.save(tgram, bundles_file)
return bundles_file
def predict(params_file, gtab, S0_file=None, out_dir=None):
"""
Create a signal prediction from DKI params
params_file : str
Full path to a file with parameters saved from a DKI fit
gtab : GradientTable object
The gradient table to predict for
S0_file : str
Full path to a nifti file that contains S0 measurements to incorporate
into the prediction. If the file contains 4D data, the volumes that
contain the S0 data must be the same as the gtab.b0s_mask.
"""
if out_dir is None:
out_dir = op.join(op.split(params_file)[0])
if S0_file is None:
S0 = 100
else:
S0 = nib.load(S0_file).get_data()
# If the S0 data is 4D, we assume it comes from an acquisition that had
# B0 measurements in the same volumes described in the gtab:
if len(S0.shape) == 4:
S0 = np.mean(S0[..., gtab.b0s_mask], -1)
# Otherwise, we assume that it's already a 3D volume, and do nothing
img = nib.load(params_file)
params = img.get_data()
pred = dki.dki_prediction(params, gtab, S0=S0)
fname = op.join(out_dir, 'dki_prediction.nii.gz')
nib.save(nib.Nifti1Image(pred, img.affine), fname)
return fname
def organize_stanford_data(path=None):
"""
Create the expected file-system structure for the Stanford HARDI data-set
"""
dpd.fetch_stanford_hardi()
if path is None:
if not op.exists(afq_home):
os.mkdir(afq_home)
base_folder = op.join(afq_home, 'stanford_hardi')
else:
base_folder = op.join(path, 'stanford_hardi')
if not op.exists(base_folder):
os.mkdir(base_folder)
os.mkdir(op.join(base_folder, 'sub-01'))
os.mkdir(op.join(base_folder, 'sub-01', 'sess-01'))
anat_folder = op.join(base_folder, 'sub-01', 'sess-01', 'anat')
os.mkdir(anat_folder)
dwi_folder = op.join(base_folder, 'sub-01', 'sess-01', 'dwi')
os.mkdir(dwi_folder)
t1_img = dpd.read_stanford_t1()
nib.save(t1_img, op.join(anat_folder, 'sub-01_sess-01_T1w.nii.gz'))
seg_img = dpd.read_stanford_labels()[-1]
nib.save(seg_img, op.join(anat_folder,
'sub-01_sess-01_aparc+aseg.nii.gz'))
dwi_img, gtab = dpd.read_stanford_hardi()
nib.save(dwi_img, op.join(dwi_folder, 'sub-01_sess-01_dwi.nii.gz'))
np.savetxt(op.join(dwi_folder, 'sub-01_sess-01_dwi.bvecs'), gtab.bvecs)
np.savetxt(op.join(dwi_folder, 'sub-01_sess-01_dwi.bvals'), gtab.bvals)