import os
import sys
import scipy
import numpy as np
import pandas as pd
from scipy import sparse
from joblib import Parallel, delayed
from typing import Optional, Union
from joblib.externals.loky import get_reusable_executor
from sklearn.metrics import silhouette_score, calinski_harabasz_score
from ._plot import MarkerVis
[docs]class ScMags(MarkerVis):
"""\
A class containing the Count matrix and cluster labels, with methods
that can select markers and visualize selected markers
Parameters
----------
data
Count matrix with rows corresponding to cells and columns to genes in
numpy array format.
labels
One-dimensional numpy array containing cluster labels.
gene_ann
One-dimensional numpy array containing gene annotations.
verbose
Verbosity
Returns
-------
:class: `~ScMags`.
Examples
--------
>>> import scmags as mg
>>> li = mg.datasets.li()
"""
def __init__(
self,
data: Union[np.ndarray,
scipy.sparse.csr_matrix,
scipy.sparse.csc_matrix],
labels: Union[np.ndarray],
gene_ann: Optional[np.ndarray] = None,
verbose: bool = True
):
#- Initialize with visualization class parameters
super().__init__()
if gene_ann is None:
gene_ann = list(np.arange(0, data.shape[1]).astype(str))
gene_ann = ["Gene_" + i for i in gene_ann]
if (len(labels) != data.shape[0]):
raise ValueError("Label count and cell count are not equal !")
if (len(gene_ann) != data.shape[1]):
raise ValueError("The length of the gene annotation list is not" +
" equal to the number of genes !")
self._sprs_form = True
if(sparse.issparse(data) is False):
self._sprs_form = False
if(data.shape[0] < 1e5):
self._silh = True
else:
self._silh = False
self.data = data
self.labels = labels
self.gene_ann = gene_ann
self._verbose = verbose
self._markers = None
self._sel_ind = None
#-------------------------------- Methods ------------------------------------#
@staticmethod
def _analyze_genes(
clstr_rnk: int,
mean_rnk: int,
nof_sel: int,
im_exp: int,
auto_thres_val: float,
filt_proc: Union[np.ndarray],
exp_sel: Union[np.ndarray],
in_exp_rates: Union[np.ndarray],
in_clst_means: Union[np.ndarray],
):
# - Elimination of low-expression genes within the j Nth cluster
high_exp = np.where(in_exp_rates[clstr_rnk, :] >= auto_thres_val)[0]
# Returns None if there is no gene above the threshold
if len(high_exp) == 0:
return None
filt_proc[1, exp_sel[high_exp]] = True
temp_exp = in_exp_rates[:, high_exp]
temp_mean = in_clst_means[:, high_exp]
# Genes with the maximum mean within the cluster
max_mean = np.argsort(temp_mean, axis=0)[::-1]
# Here mean_rnk becomes 0 in the first while loop and 1 in the
# second while loop. So the maximum mean requirement is removed
max_ind = np.where(max_mean[mean_rnk, :] == clstr_rnk)[0]
# Returns None if there is no gene with the maximum within-cluster mean.
if len(max_ind) == 0:
return None
max_mean = max_mean[:, max_ind]
temp_ind = np.where(filt_proc[1, :] == True)[0]
filt_proc[2, temp_ind[max_ind]] = True
temp_mean = temp_mean[:, max_ind]
temp_exp = temp_exp[:, max_ind]
if mean_rnk == 0:
thsec = 1
else:
thsec = 0
# Selecting genes with high difference in within-cluster means
diff = np.zeros(len(max_ind))
div_exp = np.zeros(len(max_ind))
mean_ind = np.arange(temp_exp.shape[0])
mean_ind = np.delete(mean_ind, clstr_rnk)
for k in range(len(max_ind)):
div_exp[k] = np.mean(temp_exp[mean_ind, k])
diff[k] = temp_mean[max_mean[mean_rnk,k], k] - temp_mean[max_mean[thsec,k], k]
div_exp += 1e-9 # For divide error
if len(diff) > 1:
diff = (((diff - diff.min()) / (diff.max() - diff.min())) * 9) + 1
# Product of within-cluster expression rates and difference between means
diff = diff * pow(temp_exp[clstr_rnk, :], im_exp)
# Division by non-cluster expression rates
diff = diff / pow(div_exp, im_exp)
ind = np.argsort(diff)[::-1][:nof_sel]
temp_ind = np.where(filt_proc[2, :] == True)[0]
filt_proc[3, temp_ind[ind]] = True
best_temp = np.where(filt_proc[3, :] == True)[0]
# Returns marker indexes and scores
return best_temp, diff[ind]
#-----------------------------------------------------------------------------#
[docs] def filter_genes(
self,
in_cls_thres: Optional[float] = None,
im_exp: Optional[int] = 10,
nof_sel: Optional[int] = 10,
log_norm: bool = True
):
"""
It filters out genes that cannot be cluster-specific markers for each
cluster for computational efficiency. In this way, in the next step,
calculations are made for a specific gene community for each cluster.
Parameters
----------
in_cls_thres
Minimum expression threshold within the cluster. If it is not
given, it is computed automatically from the boxplot values.
im_exp
The importance of expression rates in clusters outside the
filtered cluster when performing gene filtering for any cluster.
If it is increased, the expression rates in clusters other than
the filtered cluster will decrease.
nof_sel
The number of genes remained for each cluster after filtering.
Marker selection will be made on these genes.
log_norm
Log normalization status.
Example
-------
>>> import scmags as mg
Li dataset
>>> li = mg.datasets.li()
Filtering out unnecessary genes
>>> li.filter_genes()
After filtering genes you can see;
.. hlist::
:columns: 1
* Remaining genes,
* Scores of the remaining genes,
* Threshold values determined for each cluster
>>> rem_genes = li.get_filter_genes()
>>> gen_scores = li.get_filter_gene_scores()
>>> li.get_filt_cluster_thresholds
"""
if (in_cls_thres is None):
auto_thres = True
else:
auto_thres = False
nof_cell, nof_gene = self.data.shape
unq_labs = np.unique(self.labels)
nof_clstr = len(unq_labs)
filt_proc = np.zeros((4, nof_gene), dtype=bool)
if(self._verbose):
print('-> Eliminating low expression genes')
# Calculation of within cluster expression rates
clst_index = []
in_exp_rates = np.zeros((nof_clstr, nof_gene), dtype='f4')
if self._sprs_form:
for i in range(nof_clstr):
clst_index.append(np.where(self.labels == unq_labs[i])[0])
temp = self.data[clst_index[i], :]
in_exp_rates[i, :] = temp.getnnz(axis = 0) / temp.shape[0]
else:
for i in range(nof_clstr):
clst_index.append(np.where(self.labels == unq_labs[i])[0])
temp = self.data[clst_index[i], :]
in_exp_rates[i, :] = ((temp != 0).sum(axis=0))
in_exp_rates[i, :] = in_exp_rates[i, :] / temp.shape[0]
# Genes with less than 20% expression in all clusters are filtered out.
threshed_exp = np.sum((in_exp_rates < 0.2), axis = 0)
high_exp_ind = np.where(threshed_exp < in_exp_rates.shape[0])[0]
in_exp_rates = in_exp_rates[:, high_exp_ind]
filt_proc[0, high_exp_ind] = True
# Calculation of within cluster means
in_clst_means = np.zeros((nof_clstr, len(high_exp_ind)), dtype='f4')
if self._sprs_form:
for i in range(nof_clstr):
logdata = self.data[clst_index[i]][:, high_exp_ind].copy()
logdata.data = np.log2(logdata.data + 1, dtype='f4')
in_clst_means[i, :] = np.mean(logdata, axis=0).A[0]
else:
for i in range(nof_clstr):
logdata = self.data[clst_index[i]][:, high_exp_ind].copy()
logdata = np.log2(logdata + 1, dtype='f4')
in_clst_means[i, :] = np.mean(logdata, axis=0)
if(self._verbose):
print('-> Selecting cluster-specific candidate marker genes')
sel_ind = {}
sel_score = {}
clst_thres = {}
exp_sel = np.where(filt_proc[0, :] == True)[0]
for j in range(nof_clstr):
# Determination of in-cluster expression thresholds
if auto_thres:
iter_thres = 10
desc_temp = in_exp_rates[j, :]
desc_temp = pd.DataFrame(desc_temp[desc_temp != 0])
auto_thres_val = (desc_temp.describe().iloc[6, 0] +
desc_temp.describe().iloc[7, 0]) / 2
else:
iter_thres = 1
auto_thres_val = in_cls_thres
sel_len = 0
nof_iter = 0
mean_rnk = 0
clst_thres[unq_labs[j]] = auto_thres_val
# It is aimed to select genes that are above the determined
# expression rate and have the maximum within-cluster mean.
# If there are not as many genes as nof_sel, genes that exceed the
# threshold are selected. The threshold is lowered and the
# processes are repeated to complete the number.
while (sel_len < nof_sel) and (nof_iter < iter_thres):
nof_iter += 1
best_genes = self._analyze_genes(
clstr_rnk=j,
mean_rnk=mean_rnk,
nof_sel=nof_sel,
im_exp=im_exp,
auto_thres_val=auto_thres_val,
filt_proc=filt_proc,
exp_sel=exp_sel,
in_exp_rates=in_exp_rates,
in_clst_means=in_clst_means
)
if best_genes is not None:
sel_len = len(best_genes[0])
auto_thres_val -= auto_thres_val * 0.01
filt_proc[1:4, :] = False
if best_genes is not None:
sel_ind[unq_labs[j]] = best_genes[0]
sel_score[unq_labs[j]] = best_genes[1]
# If no gene is obtained from the previous loop, it enters
# this loop.
rem_sel_len = nof_sel - sel_len
sel_len = 0
while (sel_len < rem_sel_len) and (mean_rnk < nof_clstr-1):
mean_rnk += 1
best_genes = self._analyze_genes(
clstr_rnk=j,
mean_rnk=mean_rnk,
nof_sel=rem_sel_len,
im_exp=im_exp,
auto_thres_val=clst_thres[unq_labs[j]],
filt_proc=filt_proc,
exp_sel=exp_sel,
in_exp_rates=in_exp_rates,
in_clst_means=in_clst_means
)
filt_proc[1:4, :] = False
if best_genes is not None:
sel_len = len(best_genes[0])
rem_sel_len = rem_sel_len - sel_len
if unq_labs[j] in list(sel_ind.keys()):
sel_ind[unq_labs[j]] = np.append(
sel_ind[unq_labs[j]], best_genes[0])
sel_score[unq_labs[j]] = np.append(
sel_score[unq_labs[j]], best_genes[1])
else:
sel_ind[unq_labs[j]] = best_genes[0]
sel_score[unq_labs[j]] = best_genes[1]
if nof_iter > 1:
clst_thres[unq_labs[j]] = auto_thres_val
self._sel_ind = sel_ind
self._sel_score = sel_score
self._clst_thres = clst_thres
#-----------------------------------------------------------------------------#
def silh_compute(self, data, mask_labs, i, j):
"""
Calculates silhouette index of any gene
"""
if self._silh:
score = silhouette_score(data[:, j], mask_labs[i])
else:
score = calinski_harabasz_score(data[:, j].toarray(), mask_labs[i])
return score
#-----------------------------------------------------------------------------#
def dp_silh_compute(self, data, mask, pb_it):
"""
It finds the best combination of given genes by parallel computation
based on silhouette index.
"""
#- Number of genes to calculate
n_cores = self._n_cores
nof_markers = self._nof_markers
nof_gene = data.shape[1]
#- Number of available cores
dtc_cores = os.cpu_count()
#- If the given number of cores is greater than the
# current number of cores
if n_cores > dtc_cores:
n_cores = dtc_cores
#- Setting the number of cores to use
if(n_cores == -2 and dtc_cores >= nof_gene):
tmp_cores = nof_gene
elif(n_cores == -2 and dtc_cores < nof_gene):
tmp_cores = dtc_cores
elif(n_cores != -2 and n_cores >= nof_gene):
tmp_cores = nof_gene
elif(n_cores != -2 and n_cores < nof_gene):
tmp_cores = n_cores
whole_ınd = list(range(nof_gene))
score_change = np.zeros(nof_markers, dtype="f4")
cmb = whole_ınd
if self._silh:
cmb_res = Parallel(n_jobs=tmp_cores, backend="loky")(
delayed(silhouette_score)(data[:, j], mask) for j in cmb
)
else:
cmb_res = Parallel(n_jobs=tmp_cores, backend="loky")(
delayed(calinski_harabasz_score)(data[:, j], mask) for j in cmb
)
max_ınd = np.argmax(cmb_res)
del whole_ınd[max_ınd]
cmb = np.stack((np.repeat(max_ınd, nof_gene - 1), whole_ınd), axis=1)
nof_iter = min(nof_gene, nof_markers)
for k in range(1, nof_iter):
if self._silh:
cmb_res = Parallel(n_jobs=tmp_cores, backend="loky")(
delayed(silhouette_score)(data[:, j], mask) for j in cmb
)
else:
cmb_res = Parallel(n_jobs=tmp_cores, backend="loky")(
delayed(calinski_harabasz_score)(data[:, j], mask)
for j in cmb
)
# Best Marker Index
max_ınd = np.argmax(cmb_res)
best_mark = cmb[max_ınd]
score_change[k] = cmb_res[max_ınd]
whole_ınd = np.setdiff1d(whole_ınd, cmb[max_ınd][-1])
cmb = np.repeat([best_mark], len(whole_ınd), axis=0)
# Creating combinations from remaining genes
cmb = np.append(cmb, whole_ınd[:, np.newaxis], axis=1)
if(self._verbose):
self._pb(self._tot_iter, pb_it+1)
return best_mark
#-----------------------------------------------------------------------------#
[docs] def sel_clust_marker(
self,
nof_markers: Optional[int] = 3,
n_cores: Optional[int] = -2,
dyn_prog: bool = False
):
"""
Performs cluster-specific marker selection among filtered genes.
It does this in two different ways.
Parameters
----------
nof_markers
Number of markers to be selected for each cluster.
n_cores
Number of cores to use. (If not given = total number of cores -1)
dyn_prog
Marker selection status with dynamic programming. If True, the
combinational genes are combined and the gene combination with
the highest silhouette value is selected.
Examples
--------
>>> import scmags as mg
Li Dataset
>>> li = mg.datasets.li()
Filtering out unnecessary genes
>>> li.filter_genes()
Selection of markers from remaining genes
>>> li.sel_clust_marker()
Get markers
>>> li.get_markers()
Get markers data
>>> li.get_marker_data()
"""
if(self._sel_ind is None):
raise ValueError(
'Marker selection cannot be made without gene filtering !'
)
self._n_cores = n_cores
self._nof_markers = nof_markers
labels = self.labels
sel_ind = self._sel_ind.copy()
#- Number of remaining genes after filtering for each cluster
filt_counts = np.array([len(sel_ind[ky]) for ky in sel_ind.keys()])
#- Indexes of remaining genes after filtering for each cluster
filt_clst_ind = np.array(list(sel_ind.values()), dtype=object)
mark_keys = np.array(list(sel_ind.keys()))
nof_iter = len(mark_keys)
self._tot_iter = nof_iter
self._mark_keys = mark_keys
#- Log normalization for computation of remaining genes after filtering
clst_logdata = []
if self._sprs_form:
for i in range(nof_iter):
clst_logdata.append(
self.data[:, filt_clst_ind[i].astype(int)].copy())
clst_logdata[i].data = np.log2(
(clst_logdata[i].data + 1), dtype='f4')
else:
for i in range(nof_iter):
clst_logdata.append(
sparse.csr_matrix(
self.data[:, filt_clst_ind[i].astype(int)].copy()))
clst_logdata[i].data = np.log2(
(clst_logdata[i].data + 1), dtype='f4')
#- Clıster specific masks
mask_labs = []
for k in range(nof_iter):
all_same = np.zeros(len(labels), dtype=int)
all_same[labels == mark_keys[k]] = 1
mask_labs.append(all_same)
if(self._verbose):
print('-> Selecting markers for each cluster')
if dyn_prog:
sel_mark = []
for i in range(nof_iter):
res = self.dp_silh_compute(
data=clst_logdata[i], mask=mask_labs[i], pb_it=i
)
sel_mark.append(filt_clst_ind[i][res])
get_reusable_executor().shutdown(wait=False)
else:
res = Parallel(n_jobs=n_cores, backend="loky")(
delayed(self.silh_compute)(
data=clst_logdata[i], mask_labs=mask_labs, i=i, j=j
)
for i in range(nof_iter)
for j in range(filt_counts[i])
)
get_reusable_executor().shutdown(wait=False)
sel_mark = []
for k in range(nof_iter):
temp = res[:filt_counts[k]]
del res[:filt_counts[k]]
first_n = min(filt_counts[k], nof_markers)
srt = np.argsort(temp)[::-1][:first_n]
sel_mark.append(filt_clst_ind[k][srt])
marker_df = pd.DataFrame(sel_mark)
marker_df.index = ["C_" + str(i) for i in mark_keys]
marker_df.columns = [
"Marker_" + str(i) for i in range(1, nof_markers + 1)
]
self._sel_list_markers = sel_mark
self._markers = marker_df
#-----------------------------------------------------------------------------#
def _pb(self, total, progress):
barLength = 20
status = ""
step = progress
progress = float(progress) / float(total)
if progress >= 1.0:
progress = 1
status = "\r\n"
block = int(round(barLength * progress))
text = (
"\r-> |{}| {:.0f}% Number of Clusters With "
+ "Selected Markers : {} {}"
).format(
u"\u2B1B" * block + u"\u2B1C" * (barLength - block),
round(progress * 100, 0),
step,
status,
)
sys.stdout.write(text)
sys.stdout.flush()
#- Source: https://stackoverflow.com/questions/3160699/python-progress-bar
#--------------------------- Getter-Setter -----------------------------------#
def get_filter_genes(self, ind_return: bool = False):
"""
Returns the remaining genes after filtering.
Parameters
----------
ind_return
If True it returns the indices, if False it returns the
annotation if there is an gene annotation.
Returns
-------
Dictionary
Dictionary containing the remaining genes after filtering.
"""
sel_ind = self._sel_ind.copy()
gene_ann = self.gene_ann
if ind_return:
return sel_ind
else:
sel_ann = {}
for i in sel_ind.keys():
sel_ann[i] = []
for j in range(len(sel_ind[i])):
sel_ann[i].append(gene_ann[sel_ind[i][j]])
return sel_ann
def get_markers(self, ind_return: bool = False):
"""
Returns the selected markers
Parameters
----------
ind_return
If True it returns the indices, if False it returns the
annotation if there is an gene annotation.
Returns
-------
pd.DataFrame
Dataframe containing indices or names of selected markers.
"""
if ind_return:
return self._markers.copy()
else:
named_df = self._markers.copy()
for i in range(named_df.shape[0]):
not_nan = np.where(named_df.iloc[i, :].isna() == False)[0]
for j in not_nan:
named_df.iloc[i, j] = self.gene_ann[int(named_df.iloc[i, j])]
return named_df
def get_marker_data(self, log_norm: bool = True):
"""
It pulls the selected markers from the count matrix and returns it.
Parameters
----------
log_norm :
Log normalization status. If true it normalizes and returns.
"""
sel_ind = self._sel_list_markers.copy()
sel_key = list(self._sel_ind.keys())
marker_data = {}
for i in range(len(sel_key)):
marker_data[sel_key[i]] = self.data[:, sel_ind[i].astype(int)]
if log_norm:
if self._sprs_form:
for i in sel_key:
marker_data[i].data = np.log2(marker_data[i].data + 1)
else:
for i in sel_key:
marker_data[i] = np.log2(marker_data[i] + 1)
return marker_data
@property
def get_filter_gene_scores(self):
"""
Returns the calculated score values of the remaining genes after
filtering.
"""
return self._sel_score.copy()
@property
def get_filt_cluster_thresholds(self):
"""
Returns auto-determined in-cluster expression threshold values
"""
thres = self._clst_thres.copy()
if isinstance(thres, dict):
return thres
else:
print("Threshold was defined as %.2f for all clusters" % thres)
#-----------------------------------------------------------------------------#