Source code for toblerone.projection

"""
Projection between surface, volume and hybrid spaces 
"""

import multiprocessing as mp 
import copy
import os
from textwrap import dedent

import numpy as np 
from scipy import sparse 
import h5py

from toblerone import utils 
from toblerone.pvestimation import estimators
from toblerone.classes import Hemisphere, Surface, ImageSpace
from toblerone.core import vtx_tri_weights, vox_tri_weights

SIDES = ['L', 'R']

[docs]class Projector(object): """ Use to perform projection between volume, surface and node space. Creating a projector object may take some time whilst the consituent matrices are prepared; once created any of the individual projections may be calculated directly from the object. Node space ordering: L hemisphere surface, R hemisphere surface, subcortical voxels in linear index order, subcortical ROIs in alphabetical order according to their dictionary key (see below) Args: hemispheres (list/Hemisphere): single or list (L/R) of Hemisphere objects. Note that the surfaces of the hemispheres must be in alignment with the reference space (ie, apply any transformations beforehand). spc (str/ImageSpace): path for, or ImageSpace object, for voxel grid to project from/to rois (dict): subcortical ROIs; keys are ROI name and values are paths to surfaces or Surface objects for the ROIs themselves. factor (int): voxel subdivision factor (default 3x voxel size) cores (int): number of processor cores to use (default max) ones (bool): debug tool, whole voxel PV assignment. """ def __init__(self, hemispheres, spc, rois=None, factor=None, cores=mp.cpu_count(), ones=False): print("Initialising projector (will take some time)") if not isinstance(hemispheres, Hemisphere): if len(hemispheres) == 2: if (any([ h.side not in SIDES for h in hemispheres ]) and (hemispheres[0].side == hemispheres[1].side)): raise ValueError("Hemisphere objects must have 'L' and 'R' sides") else: if not isinstance(hemispheres, Hemisphere): raise ValueError("Projector must be initialised with 1 or 2 Hemispheres") side = hemispheres.side[0] if not side in SIDES: raise ValueError("Hemisphere must have 'L' or 'R' side") hemispheres = [hemispheres] self.hemi_dict = { h.side: copy.deepcopy(h) for h in hemispheres } if not isinstance(spc, ImageSpace): spc = ImageSpace(spc) self.spc = spc self._hemi_pvs = [] self.vox_tri_mats = [] self.vtx_tri_mats = [] self._roi_pvs = {} if factor is None: factor = np.ceil(3 * spc.vox_size) factor = (factor * np.ones(3)).astype(np.int32) ncores = cores if hemispheres[0].inSurf._use_mp else 1 supersampler = np.maximum(np.floor(spc.vox_size.round(1)/0.75), 1).astype(np.int32) if rois is not None: for name in sorted(rois.keys()): assert isinstance(rois[name], Surface) rpvs = estimators._structure(rois[name], spc, np.eye(4), supersampler, ones=ones, cores=cores) self._roi_pvs[name] = rpvs for hemi in self.iter_hemis: # If PV estimates are not present, then compute from scratch if hasattr(hemi, 'pvs'): # print("WARNING: PVs should ideally be recalculated from scratch") self._hemi_pvs.append(hemi.pvs.reshape(-1,3)) else: supersampler = np.maximum(np.floor(spc.vox_size.round(1)/0.75), 1).astype(np.int32) pvs = estimators._cortex(hemi, spc, np.eye(4), supersampler, cores=ncores, ones=ones) self._hemi_pvs.append(pvs.reshape(-1,3)) self._assemble_vtx_vox_mats(factor, ncores, ones) def _assemble_vtx_vox_mats(self, factor, ncores, ones): for side,hemi in self.hemi_dict.items(): # Calculate the constituent matrices for projection with each hemi midsurf = hemi.midsurface() vox_tri = vox_tri_weights(*hemi.surfs, self.spc, factor, cores=ncores, ones=ones, descriptor=f'{side} prisms') vtx_tri = vtx_tri_weights(midsurf, ncores) self.vox_tri_mats.append(vox_tri) self.vtx_tri_mats.append(vtx_tri)
[docs] def save(self, path): """Save Projector in HDF5 format. A projector can be re-used for multiple analyses, assuming the reference image space and cortical surfaces remain in alignment for all data. Args: path (str): path to write out with .h5 extension """ d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) with h5py.File(path, 'w') as f: # Save properties of the reference ImageSpace: vox2world, size # and filename f.create_dataset('ref_spc_vox2world', data=self.spc.vox2world) f.create_dataset('ref_spc_size', data=self.spc.size) if self.spc.fname: f.create_dataset('ref_spc_fname', data=np.array( self.spc.fname.encode("utf-8")), dtype=h5py.string_dtype('utf-8')) # Each hemisphere is a group within the file (though there may # only be 1) for idx,h in enumerate(self.iter_hemis): side = h.side g = f.create_group(f"{side}_hemi") g.create_dataset(f"{side}_pvs", data=self._hemi_pvs[idx], compression='gzip') # Sparse matrices cannot be save in HDF5, so convert them # to COO and then save as a 3 x N array, where the top row # is row indices, second is columns, and last is data. voxtri = self.vox_tri_mats[idx].tocoo() voxtri = np.vstack((voxtri.row, voxtri.col, voxtri.data)) g.create_dataset(f"{side}_vox_tri", data=voxtri, compression='gzip') # Same again: top row is row indices, then cols, then data vtxtri = self.vtx_tri_mats[idx].tocoo() vtxtri = np.vstack((vtxtri.row, vtxtri.col, vtxtri.data)) g.create_dataset(f"{side}_vtx_tri", data=vtxtri, compression='gzip') # Finally, save the surfaces of each hemisphere, named # as LPS,RPS,LWS,RWS. for k,s in h.surf_dict.items(): g.create_dataset(f"{k}_tris", data=s.tris, compression='gzip') g.create_dataset(f"{k}_points", data=s.points, compression='gzip') # Save subcortical ROI pvs if self._roi_pvs: g = f.create_group("subcortical_pvs") for k,v in self._roi_pvs.items(): g.create_dataset(k, data=v, compression='gzip')
[docs] @classmethod def load(cls, path): """Load Projector from path in HDF5 format. This is useful for performing repeated analyses with the same voxel grid and cortical surfaces. Args: path (str): path to load from """ with h5py.File(path, 'r') as f: p = cls.__new__(cls) # Recreate the reference ImageSpace first p.spc = ImageSpace.manual(f['ref_spc_vox2world'][()], f['ref_spc_size'][()]) if 'ref_spc_fname' in f: fname = f['ref_spc_fname'][()] if isinstance(fname, bytes): fname = fname.decode('utf-8') p.spc.fname = fname n_vox = p.spc.n_vox # Now read out hemisphere specific properties p._hemi_pvs = [] p.vox_tri_mats = [] p.vtx_tri_mats = [] p.hemi_dict = {} p._roi_pvs = {} for s in SIDES: hemi_key = f"{s}_hemi" if hemi_key in f: # Read out the surfaces, create the Hemisphere ins, outs = [ Surface.manual( f[hemi_key][f'{s}{n}S_points'][()], f[hemi_key][f'{s}{n}S_tris'][()], f'{s}{n}S') for n in ['W', 'P'] ] p.hemi_dict[s] = Hemisphere(ins, outs, s) # Read out the PVs array for the hemi p._hemi_pvs.append(f[hemi_key][f"{s}_pvs"][()]) # Recreate the sparse voxtri and vtxtri matrices. # They are stored as a 3 x N array, where top row # is row indices, second is column, then data voxtri = f[hemi_key][f"{s}_vox_tri"][()] assert voxtri.shape[0] == 3, 'expected 3 rows' voxtri = sparse.coo_matrix( (voxtri[2,:], (voxtri[0,:], voxtri[1,:])), shape=(n_vox, ins.tris.shape[0])) p.vox_tri_mats.append(voxtri.tocsr()) # Same convention as above vtxtri = f[hemi_key][f"{s}_vtx_tri"][()] assert vtxtri.shape[0] == 3, 'expected 3 rows' vtxtri = sparse.coo_matrix( (vtxtri[2,:], (vtxtri[0,:], vtxtri[1,:])), shape=(ins.n_points, ins.tris.shape[0])) p.vtx_tri_mats.append(vtxtri.tocsr()) if "subcortical_pvs" in f: g = f["subcortical_pvs"] for k in sorted(g.keys()): p._roi_pvs[k] = g[k][()] return p
def __repr__(self): sides = ",".join(list(self.hemi_dict.keys())) nverts = sum([ h.n_points for h in self.iter_hemis ]) spc = "\n".join(repr(self.spc).splitlines()[1:]) disp = dedent(f"""\ Projector for {sides} hemispheres, with {nverts} total vertices. Reference voxel grid:""") return disp + "\n" + spc @property def iter_hemis(self): """Iterator over hemispheres of projector, in L/R order""" for s in SIDES: if s in self.hemi_dict: yield self.hemi_dict[s] @property def n_hemis(self): """Number of hemispheres (1/2) in projector""" return len(self.hemi_dict) # Direct access to the underlying surfaces via keys LPS, RWS etc. def __getitem__(self, surf_key): side = surf_key[0] return self.hemi_dict[side].surf_dict[surf_key] @property def n_surf_nodes(self): """Number of surface vertices in projector (one or both hemis)""" return sum([ h.n_points for h in self.iter_hemis ]) @property def n_nodes(self): """Number of nodes in projector (surface, voxels, subcortical ROIs)""" return sum([ self.spc.n_vox, self.n_surf_nodes, len(self._roi_pvs) ]) @property def n_vox(self): """Number of voxels in reference ImageSpace""" return self.spc.n_vox @property def n_subcortical_nodes(self): """Number of subcortical ROIs""" return len(self._roi_pvs) @property def roi_names(self): """List of names for subcortical ROIs""" return list(self._roi_pvs.keys())
[docs] def adjacency_matrix(self, distance_weight=0): """Adjacency matrix for all surface vertices of projector. If there are two hemispheres present, the matrix indices will be arranged L,R. Args: distance_weight (int): apply inverse distance weighting, default 0 (do not weight, all egdes are unity), whereas positive values will weight edges by 1 / d^n, where d is geometric distance between vertices. Returns: sparse CSR matrix, square sized (n vertices) """ mats = [] for hemi in self.iter_hemis: midsurf = hemi.midsurface() a = midsurf.adjacency_matrix(distance_weight).tolil() verts_vox = utils.affine_transform(midsurf.points, self.spc.world2vox).round() verts_in_spc = ((verts_vox >= 0) & (verts_vox < self.spc.size)).all(-1) a[~verts_in_spc,:] = 0 a[:,~verts_in_spc] = 0 assert utils.is_symmetric(a) mats.append(a) return sparse.block_diag(mats, format="csr")
[docs] def mesh_laplacian(self, distance_weight=0): """Mesh Laplacian matrix for all surface vertices of projector. If there are two hemispheres present, the matrix indices will be arranged L/R. Args: distance_weight (int): apply inverse distance weighting, default 0 (do not weight, all egdes are unity), whereas positive values will weight edges by 1 / d^n, where d is geometric distance between vertices. Returns: sparse CSR matrix """ mats = [ h.mesh_laplacian(distance_weight) for h in self.iter_hemis ] return sparse.block_diag(mats, format="csr")
[docs] def cotangent_laplacian(self): """Cotangent Laplacian matrix for all surface vertices of projector. If there are two hemispheres present, the matrix indices will be arranged L/R. Returns: sparse CSR matrix, square with size (n_vertices) """ mats = [ h.cotangent_laplacian() for h in self.iter_hemis ] return sparse.block_diag(mats, format="csr")
[docs] def cortex_pvs(self): """Single Vx3 array of cortex PVs for all hemispheres of Projector. Returns: np.array, same shape as reference space, arranged GM, WM, non-brain in 4th dim. """ if len(self._hemi_pvs) > 1: # Combine PV estimates from each hemisphere into single map pvs = np.zeros((self.spc.n_vox, 3)) pvs[:,0] = np.minimum(1.0, self._hemi_pvs[0][:,0] + self._hemi_pvs[1][:,0]) pvs[:,1] = np.minimum(1.0 - pvs[:,0], self._hemi_pvs[0][:,1] + self._hemi_pvs[1][:,1]) pvs[:,2] = 1.0 - pvs[:,0:2].sum(1) return pvs.reshape(*self.spc.size, 3) else: return self._hemi_pvs[0].reshape(*self.spc.size, 3)
[docs] def cortex_thickness(self): return np.concatenate([ h.thickness() for h in self.iter_hemis ])
[docs] def subcortex_pvs(self): """Flattened 3D array of interior/exterior PVs for all ROIs. Returns: np.array, same shape as ``self.ref_spc`` """ if self._roi_pvs: pvs = np.stack(list(self._roi_pvs.values()), axis=-1) return np.clip(pvs.sum(-1).reshape(self.spc.size), 0, 1) else: return np.zeros(self.spc.size)
[docs] def pvs(self): """Flattened 4D array of PVs for cortex, subcortex and ROIs. Returns: np.array, same shape as reference space, arranged GM, WM, non-brain in 4th dim. """ pvs = np.zeros((self.spc.n_vox, 3)) cpvs = self.cortex_pvs() spvs = self.subcortex_pvs().flatten() pvs[...] = cpvs[...].reshape(-1,3) # Assign subcort GM from cortical CSF, sgm_reassigned = np.minimum(pvs[...,2], spvs) pvs[...,2] -= sgm_reassigned pvs[...,0] += sgm_reassigned # Assign subcort GM from cortical WM sgm_reassigned = spvs - sgm_reassigned sgm_reassigned = np.minimum(pvs[...,1], sgm_reassigned) pvs[...,1] -= sgm_reassigned pvs[...,0] += sgm_reassigned return pvs.reshape(*self.spc.size, 3)
[docs] def brain_mask(self, pv_threshold=0.1): """Boolean mask of brain voxels, in reference ImageSpace Args: pv_threshold (float): minimum brain PV (WM+GM) to include Returns: np.array, same shape as reference space, boolean dtype """ pvs = self.pvs() return (pvs[...,:2].sum(-1) > pv_threshold)
[docs] def vol2surf_matrix(self, edge_scale, vol_mask=None, surf_mask=None): """ Volume to surface projection matrix. Args: edge_scale (bool): upweight signal from voxels that are not 100% brain to account for 'missing' signal. Set True for quantities that scale with PVE (eg perfusion), set False otherwise (eg time quantities) vol_mask (np.array,bool): bool mask of voxels to include surf_mask (np.array,bool): bool mask of surface vertices to include Returns: sparse CSR matrix, sized (surface vertices x voxels). Surface vertices are arranged L then R. """ if surf_mask is None: surf_mask = np.ones(self.n_surf_nodes, dtype=bool) if vol_mask is None: vol_mask = np.ones(self.n_vox, dtype=bool) if not (vol_mask.shape[0] == vol_mask.size == self.spc.n_vox): raise ValueError(f'Vol mask incorrectly sized ({vol_mask.shape})'\ f' for reference ImageSpace ({self.spc.n_vox})') if not (surf_mask.shape[0] == surf_mask.size == self.n_surf_nodes): raise ValueError(f'Surf mask incorrectly sized ({surf_mask.shape})'\ f' for projector surfaces ({self.n_surf_nodes})') proj_mats = [ assemble_vol2surf(vox_tri, vtx_tri) for vox_tri, vtx_tri in zip(self.vox_tri_mats, self.vtx_tri_mats) ] v2s_mat = sparse.vstack(proj_mats, format="csr") assert v2s_mat.sum(1).max() < 1+1e-6 if edge_scale: brain_pv = self.cortex_pvs().reshape(-1,3)[:,:2].sum(1) brain = (brain_pv > 1e-3) upweight = np.ones(brain_pv.shape) upweight[brain] = 1 / brain_pv[brain] v2s_mat.data *= np.take(upweight, v2s_mat.indices) if not (vol_mask.all() and surf_mask.all()): v2s_mat = utils.mask_projection_matrix(v2s_mat, row_mask=surf_mask, col_mask=vol_mask) return v2s_mat
[docs] def vol2hybrid_matrix(self, edge_scale, vol_mask=None, node_mask=None): """ Volume to node space projection matrix. Args: edge_scale (bool): upweight signal from voxels that are not 100% brain to account for 'missing' signal. Set True for quantities that scale with PVE (eg perfusion), set False otherwise (eg time quantities) vol_mask (np.array,bool): bool mask of voxels to include node_mask (np.array,bool): bool mask of nodes to include Returns: sparse CSR matrix, sized (nodes x voxels) """ if node_mask is None: node_mask = np.ones(self.n_nodes, dtype=bool) if vol_mask is None: vol_mask = np.ones(self.spc.n_vox, dtype=bool) if not (vol_mask.shape[0] == vol_mask.size == self.spc.n_vox): raise ValueError(f'Vol mask incorrectly sized ({vol_mask.shape})'\ f' for reference ImageSpace ({self.spc.n_vox})') if not (node_mask.shape[0] == node_mask.size == self.n_nodes): raise ValueError(f'Node mask incorrectly sized ({node_mask.shape})'\ f' for projector nodes ({self.n_nodes})') surf_mask = node_mask[:self.n_surf_nodes] v2s_mat = self.vol2surf_matrix(edge_scale, vol_mask=vol_mask, surf_mask=surf_mask) wm_mask = node_mask[self.n_surf_nodes : self.n_nodes - self.n_subcortical_nodes] v2v_mat = sparse.eye(self.spc.n_vox) if edge_scale: brain_pv = self.pvs().reshape(-1,3) brain_pv = brain_pv[:,:2].sum(1) brain = (brain_pv > 1e-3) upweight = np.ones(brain_pv.shape) upweight[brain] = 1 / brain_pv[brain] v2v_mat.data *= upweight v2v_mat = utils.slice_sparse(v2v_mat, wm_mask, vol_mask) if self._roi_pvs: # mapping from voxels to subcortical ROIs - weighted averaging v2r_mat = np.stack( [ r.flatten() for r in self._roi_pvs.values() ], axis=0) v2r_mat = sparse.csr_matrix(v2r_mat) v2r_mat = utils.sparse_normalise(v2r_mat, 1) assert v2r_mat.sum(1).max() < 1+1e-6 if edge_scale: v2r_mat.data *= np.take(upweight, v2r_mat.indices) roi_mask = node_mask[-self.n_subcortical_nodes:] v2r_mat = utils.mask_projection_matrix(v2r_mat, row_mask=roi_mask, col_mask=vol_mask) v2n_mat = sparse.vstack((v2s_mat, v2v_mat, v2r_mat), format="csr") else: v2n_mat = sparse.vstack((v2s_mat, v2v_mat), format="csr") return v2n_mat
[docs] def surf2vol_matrix(self, edge_scale, vol_mask=None, surf_mask=None): """ Surface to volume projection matrix. Args: edge_scale (bool): downweight signal in voxels that are not 100% brain: set True for data that scales with PVE (eg perfusion), set False for data that does not (eg time quantities). vol_mask (np.array,bool): bool mask of voxels to include surf_mask (np.array,bool): bool mask of surface vertices to include Returns: sparse CSC matrix, sized (surface vertices x voxels) """ if surf_mask is None: surf_mask = np.ones(self.n_surf_nodes, dtype=bool) if vol_mask is None: vol_mask = np.ones(self.n_vox, dtype=bool) if not (vol_mask.shape[0] == vol_mask.size == self.spc.n_vox): raise ValueError(f'Vol mask incorrectly sized ({vol_mask.shape})'\ f' for reference ImageSpace ({self.spc.n_vox})') if not (surf_mask.shape[0] == surf_mask.size == self.n_surf_nodes): raise ValueError(f'Surf mask incorrectly sized ({surf_mask.shape})'\ f' for projector surfaces ({self.n_surf_nodes})') proj_mats = [] if edge_scale: gm_weights = [] if len(self.hemi_dict) == 1: gm_weights.append(np.ones(self.spc.n_vox)) else: # GM PV can be shared between both hemispheres, so rescale each row of # the s2v matrices by the proportion of all voxel-wise GM that belongs # to that hemisphere (eg, the GM could be shared 80:20 between the two) GM = self._hemi_pvs[0][:,0] + self._hemi_pvs[1][:,0] GM[GM == 0] = 1 gm_weights.append(self._hemi_pvs[0][:,0] / GM) gm_weights.append(self._hemi_pvs[1][:,0] / GM) for vox_tri, vtx_tri, weights in zip(self.vox_tri_mats, self.vtx_tri_mats, gm_weights): s2v_mat = assemble_surf2vol(vox_tri, vtx_tri).tocsc() s2v_mat.data *= np.take(weights, s2v_mat.indices) proj_mats.append(s2v_mat) pvs = self.cortex_pvs().reshape(-1,3) s2v_mat = sparse.hstack(proj_mats, format="csc") s2v_mat.data *= np.take(pvs[:,0], s2v_mat.indices) else: for vox_tri, vtx_tri in zip(self.vox_tri_mats, self.vtx_tri_mats): s2v_mat = assemble_surf2vol(vox_tri, vtx_tri).tocsc() proj_mats.append(s2v_mat) # If voxels are shared by both hemispheres, split the relative # weighting according to the GM PV of each hemisphere. This is # not a PV weighting - just deciding which hemi contributes more # to the signal. if self.n_hemis == 2: gm_sum = (self._hemi_pvs[0][...,0] + self._hemi_pvs[1][...,0]) gm_sum[gm_sum > 0] = 1 / gm_sum[gm_sum > 0] l_weight = np.clip(self._hemi_pvs[0][...,0] * gm_sum, 0, 1) r_weight = np.clip(self._hemi_pvs[1][...,0] * gm_sum, 0, 1) weights = [l_weight, r_weight] else: weights = [np.ones(proj_mats[0].shape[0])] for p,w in zip(proj_mats, weights): p.data *= np.take(w, p.indices) s2v_mat = sparse.hstack(proj_mats, format="csc") pvs = self.cortex_pvs().reshape(-1,3) s2v_mat = sparse.hstack(proj_mats, format="csc") if edge_scale: s2v_mat.data *= np.take(pvs[:,0], s2v_mat.indices) s2v_mat.data = np.clip(s2v_mat.data, 0, 1) if not (vol_mask.all() and surf_mask.all()): s2v_mat = utils.mask_projection_matrix(s2v_mat, row_mask=vol_mask, col_mask=surf_mask) assert s2v_mat.sum(1).max() < 1+1e-6 return s2v_mat
[docs] def hybrid2vol_matrix(self, edge_scale, vol_mask=None, node_mask=None): """ Node space to volume projection matrix. Args: edge_scale (bool): downweight signal in voxels that are not 100% brain: set True for data that scales with PVE (eg perfusion), set False for data that does not (eg time quantities). vol_mask (np.array,bool): bool mask of voxels to include node_mask (np.array,bool): bool mask of nodes to include Returns: sparse CSR matrix, sized (voxels x nodes) """ # Assemble the matrices corresponding to cortex and subcortex individually. # Regardless of whether the overall projection should be edge_scaled or not, # we want the balance between cortex and subcortex to be determined by the # weight of GM and WM, which is why we scale both matrices accordingly. # If the final result should be edge_scaled, then we simply stack and return # the result. If the final result should not be scaled, then we normalise # so that each row sums to 1 to get a weighted-average projection. In both # cases, the weighting given to cortex/subcortex within the projection is # determined by GM and WM PVs, only the scaling of the final matrix changes. if node_mask is None: node_mask = np.ones(self.n_nodes, dtype=bool) if vol_mask is None: vol_mask = np.ones(self.spc.n_vox, dtype=bool) if not (vol_mask.shape[0] == vol_mask.size == self.spc.n_vox): raise ValueError(f'Vol mask incorrectly sized ({vol_mask.shape})'\ f' for reference ImageSpace ({self.spc.n_vox})') if not (node_mask.shape[0] == node_mask.size == self.n_nodes): raise ValueError(f'Node mask incorrectly sized ({node_mask.shape})'\ f' for projector nodes ({self.n_nodes})') surf_mask = node_mask[:self.n_surf_nodes] wm_mask = node_mask[self.n_surf_nodes : self.n_nodes - self.n_subcortical_nodes] cpvs = self.cortex_pvs().reshape(-1,3) pvs = self.pvs().reshape(-1,3) if self._roi_pvs: # subcortical GM PVs, stacked across ROIs spvs = np.stack( [ r.flatten() for r in self._roi_pvs.values() ], axis=1) # The sum of subcortical GM and cortex GM can be greater than 1, # in which case we downweight until they sum to 1 again. This # doesn't apply to voxels with GM sum less than 1 sgm_sum = spvs.sum(-1) rescale = np.maximum(1, cpvs[...,0] + sgm_sum) # mapping from subcortial ROIs to voxels is just the PV matrix spvs = spvs / rescale[:,None] r2v_mat = sparse.csr_matrix(spvs) roi_mask = node_mask[-self.n_subcortical_nodes:] r2v_mat = utils.mask_projection_matrix(r2v_mat, row_mask=vol_mask, col_mask=roi_mask) # mappings from surface to voxel and from subcortical nodes to voxels # nb subcortical nodes are just voxels themselves! s2v_mat = self.surf2vol_matrix(edge_scale=True).tocsc() s2v_mat.data /= np.take(rescale, s2v_mat.indices) s2v_mat = utils.mask_projection_matrix(s2v_mat, row_mask=vol_mask, col_mask=surf_mask) v2v_mat = sparse.dia_matrix((pvs[:,1], 0), shape=2*[self.spc.n_vox]).tocsr() v2v_mat = utils.mask_projection_matrix(v2v_mat, row_mask=vol_mask, col_mask=wm_mask) n2v_mat = sparse.hstack((s2v_mat, v2v_mat, r2v_mat), format="csr") else: s2v_mat = self.surf2vol_matrix(edge_scale=True) s2v_mat = utils.mask_projection_matrix(s2v_mat, row_mask=vol_mask, col_mask=surf_mask) v2v_mat = sparse.dia_matrix((cpvs[:,1], 0), shape=2*[self.spc.n_vox]).tocsr() v2v_mat = utils.mask_projection_matrix(v2v_mat, row_mask=vol_mask, col_mask=wm_mask) n2v_mat = sparse.hstack((s2v_mat, v2v_mat), format="csr") assert n2v_mat.sum(1).max() < 1+1e-6 if not edge_scale: n2v_mat = utils.sparse_normalise(n2v_mat, 1) return n2v_mat
[docs] def vol2surf(self, vdata, edge_scale): """ Project data from volum to surface. Args: vdata (np.array): sized n_voxels in first dimension edge_scale (bool): upweight signal from voxels that are not 100% brain to account for 'missing' signal. Set True for quantities that scale with PVE (eg perfusion), set False otherwise (eg time quantities) Returns: np.array, sized n_vertices in first dimension """ if vdata.shape[0] != self.spc.n_vox: raise RuntimeError("vdata must have the same number of rows as" + " voxels in the reference ImageSpace") v2s_mat = self.vol2surf_matrix(edge_scale) return v2s_mat.dot(vdata)
[docs] def surf2vol(self, sdata, edge_scale): """ Project data from surface to volume. Args: sdata (np.array): sized n_vertices in first dimension (arranged L,R) edge_scale (bool): downweight signal in voxels that are not 100% brain: set True for data that scales with PVE (eg perfusion), set False for data that does not (eg time quantities). Returns: np.array, sized n_voxels in first dimension """ s2v_mat = self.surf2vol_matrix(edge_scale) if sdata.shape[0] != s2v_mat.shape[1]: raise RuntimeError("sdata must have the same number of rows as" + " total surface nodes (were one or two hemispheres used?)") return s2v_mat.dot(sdata)
[docs] def vol2hybrid(self, vdata, edge_scale): """ Project data from volume to node space. Args: vdata (np.array): sized n_voxels in first dimension edge_scale (bool): upweight signal from voxels that are not 100% brain to account for 'missing' signal. Set True for quantities that scale with PVE (eg perfusion), set False otherwise (eg time quantities) Returns: np.array, sized (n_vertices + n_voxels) in first dimension. Surface vertices are arranged L then R. """ v2n_mat = self.vol2hybrid_matrix(edge_scale) if vdata.shape[0] != v2n_mat.shape[1]: raise RuntimeError("vdata must have the same number of rows as" + " nodes (voxels+vertices) in the reference ImageSpace") return v2n_mat.dot(vdata)
[docs] def hybrid2vol(self, ndata, edge_scale): """ Project data from node space to volume. Args: ndata (np.array): sized (n_vertices + n_voxels) in first dimension, Surface data should be arranged L, R in the first dim. edge_scale (bool): downweight signal in voxels that are not 100% brain: set True for data that scales with PVE (eg perfusion), set False for data that does not (eg time quantities). Returns: np.array, sized n_voxels in first dimension """ n2v_mat = self.hybrid2vol_matrix(edge_scale) if ndata.shape[0] != n2v_mat.shape[1]: raise RuntimeError("ndata must have the same number of rows as" + " total nodes in ImageSpace (voxels+vertices)") return n2v_mat.dot(ndata)
[docs]def assemble_vol2surf(vox_tri, vtx_tri): """ Combine with normalisation the vox_tri and vtx_tri matrices into vol2surf. """ # Ensure each triangle's voxel weights sum to 1 # Ensure each vertices' triangle weights sum to 1 vox2tri = utils.sparse_normalise(vox_tri, 0).T tri2vtx = utils.sparse_normalise(vtx_tri, 1) vol2vtx = tri2vtx @ vox2tri return utils.sparse_normalise(vol2vtx, 1)
[docs]def assemble_surf2vol(vox_tri, vtx_tri): """ Combine with normalisation the vox_tri and vtx_tri matrices into surf2vol. """ # Ensure each triangle's vertex weights sum to 1 # Ensure each voxel's triangle weights sum to 1 vtx2tri = utils.sparse_normalise(vtx_tri, 0).T tri2vox = utils.sparse_normalise(vox_tri, 1) vtx2vox = tri2vox @ vtx2tri return utils.sparse_normalise(vtx2vox, 1)