# Source code for cell2location.utils._spatial_knn

```
import numpy as np
from scipy.sparse import coo_matrix
from scipy.spatial import cKDTree
from sklearn.neighbors import KDTree
from umap.umap_ import fuzzy_simplicial_set
def get_sparse_matrix_from_indices_distances_umap(knn_indices, knn_dists, n_obs, n_neighbors):
"""
Copied out of scanpy.neighbors
"""
rows = np.zeros((n_obs * n_neighbors), dtype=np.int64)
cols = np.zeros((n_obs * n_neighbors), dtype=np.int64)
vals = np.zeros((n_obs * n_neighbors), dtype=np.float64)
for i in range(knn_indices.shape[0]):
for j in range(n_neighbors):
if knn_indices[i, j] == -1:
continue # We didn't get the full knn for i
if knn_indices[i, j] == i:
val = 0.0
else:
val = knn_dists[i, j]
rows[i * n_neighbors + j] = i
cols[i * n_neighbors + j] = knn_indices[i, j]
vals[i * n_neighbors + j] = val
result = coo_matrix((vals, (rows, cols)), shape=(n_obs, n_obs))
result.eliminate_zeros()
return result.tocsr()
def compute_connectivities_umap(
knn_indices, knn_dists, n_obs, n_neighbors, set_op_mix_ratio=1.0, local_connectivity=1.0
):
r"""
Copied out of scanpy.neighbors
This is from umap.fuzzy_simplicial_set [McInnes18]_.
Given a set of data X, a neighborhood size, and a measure of distance
compute the fuzzy simplicial set (here represented as a fuzzy graph in
the form of a sparse matrix) associated to the data. This is done by
locally approximating geodesic distance at each point, creating a fuzzy
simplicial set for each such point, and then combining all the local
fuzzy simplicial sets into a global one via a fuzzy union.
"""
X = coo_matrix(([], ([], [])), shape=(n_obs, 1))
connectivities = fuzzy_simplicial_set(
X,
n_neighbors,
None,
None,
knn_indices=knn_indices,
knn_dists=knn_dists,
set_op_mix_ratio=set_op_mix_ratio,
local_connectivity=local_connectivity,
)
if isinstance(connectivities, tuple):
# In umap-learn 0.4, this returns (result, sigmas, rhos)
connectivities = connectivities[0]
distances = get_sparse_matrix_from_indices_distances_umap(knn_indices, knn_dists, n_obs, n_neighbors)
return distances, connectivities.tocsr()
# ####################--------########################
[docs]def spatial_neighbours(coords, n_sp_neighbors=7, radius=None, include_source_location=True, sample_id=None):
"""
Find spatial neighbours using the number of neighbours or radius (KDTree approach).
:param coords: numpy.ndarray with x,y positions of spots.
:param n_sp_neighbors: how many spatially-adjacent neighbors to report for each spot (including the source spot).
Use 7 for hexagonal grid.
:param radius: Supersedes `n_sp_neighbors` - radius within which to report spatially-adjacent neighbors for each spot. Pick radius based on spot size.
:param include_source_location: include the observation itself into the list of neighbours.
:param sample_id: pd.Series or np.array listing sample membership for each observation (each row of coords).
"""
# create and query spatial proximity tree within each sample
if radius is None:
if include_source_location:
coord_ind = np.zeros((coords.shape[0], n_sp_neighbors))
else:
coord_ind = np.zeros((coords.shape[0], n_sp_neighbors - 1))
else:
coord_ind = np.zeros(coords.shape[0])
if sample_id is None:
sample_id = np.array(["sample" for i in range(coords.shape[0])])
total_ind = np.arange(0, coords.shape[0]).astype(int)
for sam in np.unique(sample_id):
sam_ind = np.isin(sample_id, [sam])
coord_tree = KDTree(coords[sam_ind, :])
if radius is None:
n_list = coord_tree.query(coords[sam_ind, :], k=n_sp_neighbors, return_distance=False)
n_list = np.array(n_list)
# replace sample-specific indices with a global index
for c in range(n_list.shape[1]):
n_list[:, c] = total_ind[sam_ind][n_list[:, c]]
if include_source_location:
coord_ind[sam_ind, :] = n_list
else:
n_list_sel = n_list != np.arange(sam_ind.sum()).reshape(sam_ind.sum(), 1)
coord_ind[sam_ind, :] = n_list[n_list_sel].reshape((sam_ind.sum(), n_sp_neighbors - 1))
else:
coord_ind[sam_ind] = coord_tree.query_radius(coords[sam_ind, :], radius, count_only=False)
return coord_ind.astype(int)
[docs]def sum_neighbours(X_data, neighbours):
"""
Sum X_data values across neighbours.
:param coords: numpy.ndarray with variable measurements for each observation (observations * variables)
:param neighbours: numpy.ndarray with neigbour indices for each observation (observations * neigbours)
"""
return np.sum([X_data[neighbours[:, n], :] for n in range(neighbours.shape[1])], axis=0)
# ####################--------########################
def spatial_knn(
coords, expression, n_neighbors=14, n_sp_neighbors=7, radius=None, which_exprs_dims=None, sample_id=None
):
"""
A variant on the standard knn neighbor graph inference procedure that also includes the spatial neighbors of each spot.
With help from Krzysztof Polanski.
:param coords: numpy.ndarray with x,y positions of spots.
:param expression: numpy.ndarray with expression of programmes / cluster expression (cols) of spots (rows).
:param n_neighbors: how many non-spatially-adjacent neighbors to report for each spot
:param n_sp_neighbors: how many spatially-adjacent neighbors to report for each spot. Use 7 for hexagonal grid.
:param radius: Supercedes `n_sp_neighbors` - radius within which to report spatially-adjacent neighbors for each spot. Pick radius based on spot size.
:param which_exprs_dims: which expression dimensions to use (cols)?
"""
# create and query spatial proximity tree within each sample
if radius is None:
coord_ind = np.zeros((coords.shape[0], n_sp_neighbors))
else:
coord_ind = np.zeros(coords.shape[0])
for sam in sample_id.unique():
coord_tree = KDTree(coords[sample_id.isin([sam]), :])
if radius is None:
coord_ind[sample_id.isin([sam]), :] = coord_tree.query(
coords[sample_id.isin([sam]), :], k=n_sp_neighbors, return_distance=False
)
else:
coord_ind[sample_id.isin([sam])] = coord_tree.query_radius(
coords[sample_id.isin([sam]), :], radius, count_only=False
)
# if selected dimensions not provided choose all
if which_exprs_dims is None:
which_exprs_dims = np.arange(expression.shape[1])
# print(which_exprs_dims)
# extract and index the appropriate bit of the PCA
pca = expression[:, which_exprs_dims]
ckd = cKDTree(pca)
# the actual number of neighbours - you'll get seven extra spatial neighbours in the thing
knn = n_neighbors + n_sp_neighbors
# identify the knn for each spot. this is guaranteed to contain at least n_neighbors non-adjacent spots
# this is exactly what we're after
ckdout = ckd.query(x=pca, k=knn, n_jobs=-1)
# create numeric vectors for subsetting later
numtemp = np.arange(expression.shape[0])
rowtemp = np.arange(knn)
# rejigger the neighour pool by including the spatially adjacent ones
for i in np.arange(expression.shape[0]):
# identify the spatial neighbours for the spot and compute their distance
mask = np.isin(numtemp, coord_ind[i])
# filter spatial neighbours by sample
if sample_id is not None:
mask = mask & sample_id.isin([sample_id[i]])
neigh = numtemp[mask]
ndist_temp = pca[mask, :] - pca[i, :]
ndist_temp = ndist_temp.reshape((mask.sum(), pca.shape[1]))
ndist = np.linalg.norm(ndist_temp, axis=1)
# how many non-adjacent neighbours will we get to keep?
# (this fluctuates as e.g. edge spots will have fewer hex neighbours)
kpoint = knn - len(neigh)
# the indices of the top kpoint number of non-adjacent neighbours (by excluding adjacent ones from the set)
inds = rowtemp[[i not in neigh for i in ckdout[1][0, :]]][:kpoint]
# keep the identified top non-adjacent neighbours
ckdout[0][i, :kpoint] = ckdout[0][i, inds]
ckdout[1][i, :kpoint] = ckdout[1][i, inds]
# add the spatial neighbours in the remaining spots of the knn graph
ckdout[0][i, kpoint:] = ndist
ckdout[1][i, kpoint:] = neigh
# sort each row of the graph in ascending distance order
# (sometimes spatially adjacent neighbours are some of the top ones)
knn_distances, knn_indices = ckdout
newidx = np.argsort(knn_distances, axis=1)
knn_indices = knn_indices[np.arange(np.shape(knn_indices)[0])[:, np.newaxis], newidx]
knn_distances = knn_distances[np.arange(np.shape(knn_distances)[0])[:, np.newaxis], newidx]
# compute connectivities and export as a dictionary
dist, cnts = compute_connectivities_umap(knn_indices, knn_distances, knn_indices.shape[0], knn_indices.shape[1])
neighbors = {
"distances": dist,
"connectivities": cnts,
"params": {"n_neighbors": n_neighbors + n_sp_neighbors, "method": "spot_factors2knn", "metric": "euclidean"},
}
return neighbors
# ####################--------########################
def spot_factors2knn(
adata,
coord_col=["x", "y"],
sample_col="sample.x",
node_name="nUMI_factors",
sample_type="mean",
n_neighbors=14,
n_sp_neighbors=7,
which_exprs_dims=None,
which_sample=None,
):
r"""Construct spatially aware KNN graph using W spot weights
:param adata: anndata object with spot weights.
:param coord_col: anndata.obs columns containing spatial coordinates.
:param sample_col: anndata.obs columns containing individual Visium sample identifier.
:param node_name: model paramter representing spot contributions of cell types W.
:param sample_type: use 'means' or 5% quantile ('q05') of the posterior of W?
:param n_neighbors: number of expression neighbours, between spots within and across samples.
:param n_sp_neighbors: number of spatial neighbours, only within spots of the same sample. Defaults to 7 to get
direct neighbours in the hexagonal grid. 13 and 19 give the next rows of spatial neighbours.
:param which_exprs_dims: select specific cell states from W
:param which_sample: select one or several samples to construct the graph
:return: updated anndata with neighbour graph in adata.uns['neighbors']
"""
if sample_type not in ["mean", "sds", "q05"]:
raise ValueError("sample_type should be one of `['mean', 'sds', 'q05']`.")
obs = adata.obs
# select sample
if which_sample is not None:
obs = obs[which_sample, :]
obs_col = obs.columns
# extract needed info
coords = obs[coord_col].values
col_ind = [sample_type + "_" + node_name in i for i in obs_col.tolist()]
exprs = obs.loc[:, col_ind].values
sample_id = obs[sample_col]
adata.uns["neighbors"] = spatial_knn(
coords,
expression=exprs,
n_neighbors=n_neighbors,
n_sp_neighbors=n_sp_neighbors,
which_exprs_dims=which_exprs_dims,
sample_id=sample_id,
)
return adata
```