Source code for toblerone.pvestimation

"""PV estimation functions"""

import os.path as op 
import functools
import copy
import nibabel 

import numpy as np 
import tqdm
import regtricks as rt 

from toblerone.pvestimation import estimators
from toblerone import utils, core
from toblerone.classes import ImageSpace, Surface

[docs]@utils.enforce_and_load_common_arguments def cortex(ref, struct2ref, **kwargs): """ Estimate PVs for L/R cortex. To estimate for a single hemisphere, provide only surfaces for that side. Args: ref (str/regtricks ImageSpace): voxel grid in which to estimate PVs. struct2ref (str/np.array/rt.Registration): registration between space of surface and reference [see `-flirt` and `-stuct`], use 'I' for identity. fsdir (str): path to a FreeSurfer subject directory. LWS/LPS/RWS/RPS (str): individual paths to the surfaces, eg LWS = Left White surface, RPS = Right Pial surace. Other parameters: flirt (bool): denoting struct2ref is FLIRT transform; if so, set struct. struct (str): path to structural image from which surfaces were derived. cores (int): number of cores to use, default 8. supersample (int/array): single or 3 values, supersampling factor. Returns: (np.array) PVs arranged GM/WM/non-brain in 4th dim. """ if not any([ kwargs.get('fsdir') is not None, any([ kwargs.get(s) is not None for s in ['LWS', 'LPS', 'RWS', 'RPS'] ]) ]): raise RuntimeError("Either a fsdir or paths to LWS/LPS etc" "must be given.") hemispheres = utils.load_surfs_to_hemispheres(**kwargs) # Either create local copy of ImageSpace object or init from path if isinstance(ref, ImageSpace): ref_space = copy.deepcopy(ref) else: ref_space = ImageSpace(ref) # Set supersampler and estimate. if kwargs.get('supersample') is None: supersampler = np.maximum(np.floor(ref_space.vox_size.round(1) / 0.75), 1).astype(np.int32) else: supersampler = kwargs.get('supersample') * np.ones(3) pvs = estimators._cortex(hemispheres, ref_space, struct2ref, supersampler, kwargs['cores'], bool(kwargs.get('ones'))) return pvs
[docs]@utils.enforce_and_load_common_arguments def structure(ref, struct2ref, surf, **kwargs): """ Estimate PVs for a structure defined by a single surface. Args: ref (str/regtricks ImageSpace): voxel grid in which to estimate PVs. struct2ref (str/np.array/rt.Registration): registration between space of surface and reference (see -flirt and -stuct). Use 'I' for identity. surf (str/`Surface`): path to surface (see coords argument below) Other parameters: flirt (bool): denoting struct2ref is FLIRT transform; if so, set struct. coords (str): convention by which surface is defined: default is 'world' (mm coords), for FIRST surfaces set as 'fsl' and provide struct argument struct (str): path to structural image from which surfaces were derived cores (int): number of cores to use, default 8 supersample (int/array): single or 3 values, supersampling factor Returns: (np.array) PV image, sized equal to reference space """ coords = kwargs.get('coords', 'world') if coords == 'fsl' and not kwargs.get('struct'): raise RuntimeError("Structural image must be supplied for FIRST surfs") if type(surf) is str: surf = Surface(surf, name=op.split(surf)[1]) if kwargs.get('coords', 'world') == 'fsl': struct_spc = ImageSpace(kwargs['struct']) surf = surf.transform(struct_spc.FSL2world) elif type(surf) is not Surface: raise RuntimeError("surf kwarg must be a Surface object or path to one") # Either create local copy of ImageSpace object or init from path if isinstance(ref, ImageSpace): ref_space = copy.deepcopy(ref) else: ref_space = ImageSpace(ref) if kwargs.get('supersample') is None: supersampler = np.maximum(np.floor(ref_space.vox_size.round(1) / 0.75), 1).astype(np.int32) else: supersampler = kwargs.get('supersample') * np.ones(3) pvs = estimators._structure(surf, ref_space, struct2ref, supersampler, bool(kwargs.get('ones')), kwargs['cores']) return pvs
def __structure_wrapper(surf, **kwargs): """Convenience method for parallel processing""" return structure(surf=surf, **kwargs)
[docs]@utils.enforce_and_load_common_arguments def complete(ref, struct2ref, **kwargs): """ Estimate PVs for cortex and all structures identified by FIRST within a reference image space. Use FAST to fill in non-surface PVs. Args: ref (str/regtricks.ImageSpace): voxel grid in which to estimate PVs. struct2ref (str/np.array/regtricks.Registration): registration between space of surface and reference (see -flirt and -stuct). Use 'I' for identity. fslanat: path to fslanat directory. This REPLACES firstdir/fastdir/struct. firstdir (str): FIRST directory in which .vtk surfaces are located fastdir (str): FAST directory in which _pve_0/1/2 are located struct (str): path to structural image from which FIRST surfaces were dervied fsdir (str): FreeSurfer subject directory, OR: LWS/LPS/RWS/RPS (str): paths to individual surfaces (L/R white/pial) Other parameters: flirt (bool): denoting struct2ref is FLIRT transform; if so, set struct. coords (str): convention by which surface is defined: default is 'world' (mm coords), for FIRST surfaces set as 'fsl' and provide struct argument struct (str): path to structural image from which surfaces were derived cores (int): number of cores to use, default 8 supersample (int/array): single or 3 values, supersampling factor Returns: (dict) keyed for each structure and also flattened ('stacked') """ print("Estimating PVs for", ref.file_name) # If anat dir then various subdirs are loaded by @enforce_common_args # If not then direct load below if not bool(kwargs.get('fsdir')): if not all([ bool(kwargs.get(k)) for k in ['LWS','LPS','RWS','RPS'] ]): raise RuntimeError("If fsdir not given, " + "provide paths for LWS,LPS,RWS,RPS") if not bool(kwargs.get('fslanat')): if not (bool(kwargs.get('fastdir')) and bool(kwargs.get('firstdir'))): raise RuntimeError("If not using fslanat dir, fastdir/firstdir required") # Resample FASTs to reference space. Then redefine CSF as 1-(GM+WM) fast_paths = utils._loadFASTdir(kwargs['fastdir']) fast_spc = fast_paths['FAST_GM'] fast = np.stack([ nibabel.load(fast_paths[f'FAST_{p}']).get_fdata() for p in ['GM', 'WM'] ], axis=-1) fasts_transformed = rt.Registration(struct2ref).apply_to_array(fast, fast_spc, ref) output = dict(FAST_GM=fasts_transformed[...,0], FAST_WM=fasts_transformed[...,1]) output['FAST_CSF'] = np.maximum(0, 1 - (output['FAST_WM'] + output['FAST_GM'])) # Process subcortical structures first. FIRSTsurfs = utils._loadFIRSTdir(kwargs['firstdir']) subcortical = [] struct_spc = ImageSpace(kwargs['struct']) for name, surf in FIRSTsurfs.items(): s = Surface(surf, name) s = s.transform(struct_spc.FSL2world) subcortical.append(s) disp = "Structures found: " + ", ".join([ s.name for s in subcortical ] + ['Cortex']) print(disp) # To estimate against each subcortical structure, we apply the following # partial func to each using a map() call. Carry kwargs from this func desc = 'Subcortical structures' estimator = functools.partial(__structure_wrapper, ref=ref, struct2ref=struct2ref, **kwargs) # This is equivalent to a map(estimator, subcortical) call # All the extra stuff (tqdm etc) is used for progress bar results = [ pv for _, pv in tqdm.tqdm(enumerate(map(estimator, subcortical)), total=len(subcortical), desc=desc, bar_format=core.BAR_FORMAT, ascii=True) ] output.update(dict(zip([s.name for s in subcortical], results))) # Now do the cortex, then stack the whole lot ctx = cortex(ref=ref, struct2ref=struct2ref, **kwargs) for i,t in enumerate(['_GM', '_WM', '_nonbrain']): output['cortex' + t] = (ctx[:,:,:,i]) stacked = estimators.stack_images( {k:v for k,v in output.items() if k != 'BrStem'}) output['GM'] = stacked[:,:,:,0] output['WM'] = stacked[:,:,:,1] output['nonbrain'] = stacked[:,:,:,2] output['stacked'] = stacked return output