Source code for cell2location.models.simplified._cell2location_v3_no_mg_module

import numpy as np
import pandas as pd
import pyro
import pyro.distributions as dist
import torch
from pyro.nn import PyroModule
from scvi import REGISTRY_KEYS
from scvi.nn import one_hot

# class NegativeBinomial(TorchDistributionMixin, ScVINegativeBinomial):
#    pass

[docs]class LocationModelLinearDependentWMultiExperimentLocationBackgroundNormLevelNoMGPyroModel(PyroModule): r""" Cell2location models the elements of :math:`D` as Negative Binomial distributed, given an unobserved gene expression level (rate) :math:`mu` and a gene- and batch-specific over-dispersion parameter :math:`\alpha_{e,g}` which accounts for unexplained variance: .. math:: D_{s,g} \sim \mathtt{NB}(\mu_{s,g}, \alpha_{e,g}) The expression level of genes :math:`\mu_{s,g}` in the mRNA count space is modelled as a linear function of expression signatures of reference cell types :math:`g_{f,g}`: .. math:: \mu_{s,g} = (\left (\sum_{f} {w_{s,f} \: g_{f,g}} \right) + s_{e,g}) y_{s} Here, :math:`w_{s,f}` denotes regression weight of each reference signature :math:`f` at location :math:`s`, which can be interpreted as the expected number of cells at location :math:`s` that express reference signature :math:`f`; :math:`g_{f,g}` denotes the reference signatures of cell types :math:`f` of each gene :math:`g`, `cell_state_df` input ; :math:`y_{s}` denotes a location/observation-specific scaling parameter which adjusts for differences in sensitivity between observations and batches; :math:`s_{e,g}` is additive component that account for gene- and location-specific shift, such as due to contaminating or free-floating RNA. To account for the similarity of location patterns across cell types, :math:`w_{s,f}` is modelled using another layer of decomposition (factorization) using :math:`r={1, .., R}` groups of cell types, that can be interpreted as cellular compartments or tissue zones. Unless stated otherwise, R is set to 50. Corresponding graphical model can be found in supplementary methods: Approximate Variational Inference is used to estimate the posterior distribution of all model parameters. Estimation of absolute cell abundance :math:`w_{s,f}` is guided using informed prior on the number of cells (argument called `N_cells_per_location`). It is a tissue-level global estimate, which can be derived from histology images (H&E or DAPI), ideally paired to the spatial expression data or at least representing the same tissue type. This parameter can be estimated by manually counting nuclei in a 10-20 locations in the histology image (e.g. using 10X Loupe browser), and computing the average cell abundance. An appropriate setting of this prior is essential to inform the estimation of absolute cell type abundance values, however, the model is robust to a range of similar values. In settings where suitable histology images are not available, the size of capture regions relative to the expected size of cells can be used to estimate `N_cells_per_location`. The prior on detection efficiency per location :math:`y_s` is selected to discourage over-normalisation, such that unless data has evidence of strong technical effect, the effect is assumed to be small and close to the mean sensitivity for each batch :math:`y_e`: .. math:: y_s ~ Gamma(detection\_alpha, detection\_alpha / y_e) where y_e is unknown/latent average detection efficiency in each batch/experiment: .. math:: y_e ~ Gamma(10, 10 / detection\_mean) """ def __init__( self, n_obs, n_vars, n_factors, n_batch, cell_state_mat, n_groups: int = 50, detection_mean=1 / 2, detection_alpha=200.0, N_cells_per_location=8.0, A_factors_per_location=7.0, Y_groups_per_location=7.0, N_cells_mean_var_ratio=1.0, alpha_g_phi_hyp_prior={"alpha": 9.0, "beta": 3.0}, gene_add_alpha_hyp_prior={"alpha": 9.0, "beta": 3.0}, gene_add_mean_hyp_prior={ "alpha": 1.0, "beta": 100.0, }, detection_hyp_prior={"mean_alpha": 10.0}, w_sf_mean_var_ratio=5.0, ): super().__init__() self.n_obs = n_obs self.n_vars = n_vars self.n_factors = n_factors self.n_batch = n_batch self.n_groups = n_groups self.alpha_g_phi_hyp_prior = alpha_g_phi_hyp_prior self.w_sf_mean_var_ratio = w_sf_mean_var_ratio self.gene_add_alpha_hyp_prior = gene_add_alpha_hyp_prior self.gene_add_mean_hyp_prior = gene_add_mean_hyp_prior detection_hyp_prior["mean"] = detection_mean detection_hyp_prior["alpha"] = detection_alpha self.detection_hyp_prior = detection_hyp_prior factors_per_groups = A_factors_per_location / Y_groups_per_location self.register_buffer( "detection_hyp_prior_alpha", torch.tensor(self.detection_hyp_prior["alpha"]), ) self.register_buffer( "detection_mean_hyp_prior_alpha", torch.tensor(self.detection_hyp_prior["mean_alpha"]), ) self.register_buffer( "detection_mean_hyp_prior_beta", torch.tensor(self.detection_hyp_prior["mean_alpha"] / self.detection_hyp_prior["mean"]), ) # create hyperparameter tensors self.cell_state_mat = cell_state_mat self.register_buffer("cell_state", torch.tensor(cell_state_mat.T)) self.register_buffer("N_cells_per_location", torch.tensor(N_cells_per_location)) self.register_buffer("factors_per_groups", torch.tensor(factors_per_groups)) self.register_buffer("Y_groups_per_location", torch.tensor(Y_groups_per_location)) self.register_buffer("N_cells_mean_var_ratio", torch.tensor(N_cells_mean_var_ratio)) self.register_buffer( "alpha_g_phi_hyp_prior_alpha", torch.tensor(self.alpha_g_phi_hyp_prior["alpha"]), ) self.register_buffer( "alpha_g_phi_hyp_prior_beta", torch.tensor(self.alpha_g_phi_hyp_prior["beta"]), ) self.register_buffer( "gene_add_alpha_hyp_prior_alpha", torch.tensor(self.gene_add_alpha_hyp_prior["alpha"]), ) self.register_buffer( "gene_add_alpha_hyp_prior_beta", torch.tensor(self.gene_add_alpha_hyp_prior["beta"]), ) self.register_buffer( "gene_add_mean_hyp_prior_alpha", torch.tensor(self.gene_add_mean_hyp_prior["alpha"]), ) self.register_buffer( "gene_add_mean_hyp_prior_beta", torch.tensor(self.gene_add_mean_hyp_prior["beta"]), ) self.register_buffer("w_sf_mean_var_ratio_tensor", torch.tensor(self.w_sf_mean_var_ratio)) self.register_buffer("n_factors_tensor", torch.tensor(self.n_factors)) self.register_buffer("n_groups_tensor", torch.tensor(self.n_groups)) self.register_buffer("ones", torch.ones((1, 1))) self.register_buffer("ones_1_n_groups", torch.ones((1, self.n_groups))) self.register_buffer("ones_n_batch_1", torch.ones((self.n_batch, 1))) self.register_buffer("eps", torch.tensor(1e-8)) @staticmethod def _get_fn_args_from_batch(tensor_dict): x_data = tensor_dict[REGISTRY_KEYS.X_KEY] ind_x = tensor_dict["ind_x"].long().squeeze() batch_index = tensor_dict[REGISTRY_KEYS.BATCH_KEY] return (x_data, ind_x, batch_index), {}
[docs] def create_plates(self, x_data, idx, batch_index): return pyro.plate("obs_plate", size=self.n_obs, dim=-2, subsample=idx)
[docs] def list_obs_plate_vars(self): """ Create a dictionary with: 1. "name" - the name of observation/minibatch plate; 2. "input" - indexes of model args to provide to encoder network when using amortised inference; 3. "sites" - dictionary with * keys - names of variables that belong to the observation plate (used to recognise and merge posterior samples for minibatch variables) * values - the dimensions in non-plate axis of each variable (used to construct output layer of encoder network when using amortised inference) """ return { "name": "obs_plate", "input": [0, 2], # expression data + (optional) batch index "input_transform": [ torch.log1p, lambda x: x, ], # how to transform input data before passing to NN "sites": { "n_s_cells_per_location": 1, "y_s_groups_per_location": 1, "z_sr_groups_factors": self.n_groups, "w_sf": self.n_factors, "detection_y_s": 1, }, }
[docs] def forward(self, x_data, idx, batch_index): obs2sample = one_hot(batch_index, self.n_batch) obs_plate = self.create_plates(x_data, idx, batch_index) # =====================Cell abundances w_sf======================= # # factorisation prior on w_sf models similarity in locations # across cell types f and reflects the absolute scale of w_sf with obs_plate: n_s_cells_per_location = pyro.sample( "n_s_cells_per_location", dist.Gamma( self.N_cells_per_location * self.N_cells_mean_var_ratio, self.N_cells_mean_var_ratio, ), ) y_s_groups_per_location = pyro.sample( "y_s_groups_per_location", dist.Gamma(self.Y_groups_per_location, self.ones), ) # cell group loadings shape = self.ones_1_n_groups * y_s_groups_per_location / self.n_groups_tensor rate = self.ones_1_n_groups / (n_s_cells_per_location / y_s_groups_per_location) with obs_plate: z_sr_groups_factors = pyro.sample( "z_sr_groups_factors", dist.Gamma(shape, rate), # .to_event(1)#.expand([self.n_groups]).to_event(1) ) # (n_obs, n_groups) k_r_factors_per_groups = pyro.sample( "k_r_factors_per_groups", dist.Gamma(self.factors_per_groups, self.ones).expand([self.n_groups, 1]).to_event(2), ) # (self.n_groups, 1) c2f_shape = k_r_factors_per_groups / self.n_factors_tensor x_fr_group2fact = pyro.sample( "x_fr_group2fact", dist.Gamma(c2f_shape, k_r_factors_per_groups).expand([self.n_groups, self.n_factors]).to_event(2), ) # (self.n_groups, self.n_factors) with obs_plate: w_sf_mu = z_sr_groups_factors @ x_fr_group2fact w_sf = pyro.sample( "w_sf", dist.Gamma( w_sf_mu * self.w_sf_mean_var_ratio_tensor, self.w_sf_mean_var_ratio_tensor, ), ) # (self.n_obs, self.n_factors) # =====================Location-specific detection efficiency ======================= # # y_s with hierarchical mean prior detection_mean_y_e = pyro.sample( "detection_mean_y_e", dist.Gamma( self.ones * self.detection_mean_hyp_prior_alpha, self.ones * self.detection_mean_hyp_prior_beta, ) .expand([self.n_batch, 1]) .to_event(2), ) detection_hyp_prior_alpha = pyro.deterministic( "detection_hyp_prior_alpha", self.ones_n_batch_1 * self.detection_hyp_prior_alpha, ) beta = (obs2sample @ detection_hyp_prior_alpha) / (obs2sample @ detection_mean_y_e) with obs_plate: detection_y_s = pyro.sample( "detection_y_s", dist.Gamma(obs2sample @ detection_hyp_prior_alpha, beta), ) # (self.n_obs, 1) # =====================Gene-specific additive component ======================= # # per gene molecule contribution that cannot be explained by # cell state signatures (e.g. background, free-floating RNA) s_g_gene_add_alpha_hyp = pyro.sample( "s_g_gene_add_alpha_hyp", dist.Gamma(self.ones * self.gene_add_alpha_hyp_prior_alpha, self.ones * self.gene_add_alpha_hyp_prior_beta), ) s_g_gene_add_mean = pyro.sample( "s_g_gene_add_mean", dist.Gamma( self.gene_add_mean_hyp_prior_alpha, self.gene_add_mean_hyp_prior_beta, ) .expand([self.n_batch, 1]) .to_event(2), ) # (self.n_batch) s_g_gene_add_alpha_e_inv = pyro.sample( "s_g_gene_add_alpha_e_inv", dist.Exponential(s_g_gene_add_alpha_hyp).expand([self.n_batch, 1]).to_event(2), ) # (self.n_batch) s_g_gene_add_alpha_e = self.ones / s_g_gene_add_alpha_e_inv.pow(2) s_g_gene_add = pyro.sample( "s_g_gene_add", dist.Gamma(s_g_gene_add_alpha_e, s_g_gene_add_alpha_e / s_g_gene_add_mean) .expand([self.n_batch, self.n_vars]) .to_event(2), ) # (self.n_batch, n_vars) # =====================Gene-specific overdispersion ======================= # alpha_g_phi_hyp = pyro.sample( "alpha_g_phi_hyp", dist.Gamma(self.ones * self.alpha_g_phi_hyp_prior_alpha, self.ones * self.alpha_g_phi_hyp_prior_beta), ) alpha_g_inverse = pyro.sample( "alpha_g_inverse", dist.Exponential(alpha_g_phi_hyp).expand([self.n_batch, self.n_vars]).to_event(2), ) # (self.n_batch, self.n_vars) # =====================Expected expression ======================= # # expected expression mu = ((w_sf @ self.cell_state) + (obs2sample @ s_g_gene_add)) * detection_y_s alpha = obs2sample @ (self.ones / alpha_g_inverse.pow(2)) # convert mean and overdispersion to total count and logits # total_count, logits = _convert_mean_disp_to_counts_logits( # mu, alpha, eps=self.eps # ) # =====================DATA likelihood ======================= # # Likelihood (sampling distribution) of data_target & add overdispersion via NegativeBinomial with obs_plate: pyro.sample( "data_target", dist.GammaPoisson(concentration=alpha, rate=alpha / mu), # dist.NegativeBinomial(total_count=total_count, logits=logits), obs=x_data, ) # =====================Compute mRNA count from each factor in locations ======================= # with obs_plate: mRNA = w_sf * (self.cell_state).sum(-1) pyro.deterministic("u_sf_mRNA_factors", mRNA)
[docs] def compute_expected(self, samples, adata_manager, ind_x=None): r"""Compute expected expression of each gene in each location. Useful for evaluating how well the model learned expression pattern of all genes in the data. """ if ind_x is None: ind_x = np.arange(adata_manager.adata.n_obs).astype(int) else: ind_x = ind_x.astype(int) obs2sample = adata_manager.get_from_registry(REGISTRY_KEYS.BATCH_KEY) obs2sample = pd.get_dummies(obs2sample.flatten()).values[ind_x, :] mu = (["w_sf"][ind_x, :], self.cell_state_mat.T) +, samples["s_g_gene_add"]) ) * samples["detection_y_s"][ind_x, :] alpha =, 1 / np.power(samples["alpha_g_inverse"], 2)) return {"mu": mu, "alpha": alpha, "ind_x": ind_x}