Skip to content

Commit

Permalink
parallel processing at cluster stats
Browse files Browse the repository at this point in the history
  • Loading branch information
yzizhen committed Aug 26, 2021
1 parent aa9af2b commit 217418e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 94 deletions.
3 changes: 2 additions & 1 deletion R/harmonize.R
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,8 @@ combine_cl <- function(all.results)
markers=all.results[[1]]$markers
n.cl = max(cl)
for(i in 2:length(all.results)){
if(is.null(all.results[[i]]$cl) | length(unique(all.results[[i]]$cl)) < 2) next
#if(is.null(all.results[[i]]$cl) | length(unique(all.results[[i]]$cl)) < 2) next
if(is.null(all.results[[i]]$cl)) next
new.cl = all.results[[i]]$cl
new.cl = setNames(as.integer(new.cl)+ n.cl,names(new.cl))
cl[names(new.cl)] = new.cl
Expand Down
30 changes: 5 additions & 25 deletions R/harmonize_impute.R
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,16 @@ impute_knn_global <- function(comb.dat, split.results, select.genes, select.cell
{
print(x)
tmp.cells= select.cells[comb.dat$meta.df[select.cells,"platform"]==x]
ref.cells = ref.list[[x]]
ref.cells = ref.list[[x]]
rd.result <- rd_PCA(comb.dat$dat.list[[x]], select.genes, select.cells=tmp.cells, sampled.cells = ref.cells, max.pca =max.dim, th=th, method=method,mc.cores=mc.cores,verbose=verbose)
org.rd.dat.list[[x]] = rd.result
rd.result = org.rd.dat.list[[x]]
if(!is.null(rm.eigen)){
rd.dat = filter_RD(rd.result$rd.dat, rm.eigen, rm.th,verbose=verbose)
}
#print(ncol(rd.dat))
knn = get_knn_batch(rd.dat, rd.dat[ref.cells], method="Annoy.Euclidean", mc.cores=mc.cores)
dat = as.matrix(comb.dat$dat.list[[x]][select.genes,ref.cells])
knn = get_knn_batch(rd.dat, rd.dat[ref.cells,], method="Annoy.Euclidean", mc.cores=mc.cores, batch.size=50000,k=k,transposed=FALSE)
dat = as.matrix(comb.dat$dat.list[[x]][select.genes,ref.cells])
reference.id = 1:length(ref.cells)
cell.id = match(row.names(rd.dat), select.cells)
gene.id = 1:length(select.genes)
Expand All @@ -138,7 +138,8 @@ impute_knn_global <- function(comb.dat, split.results, select.genes, select.cell
}

###cross-modality Imputation based on nearest neighbors in each iteraction of clustering using anchoring genes or genes shown to be differentiall expressed.
for(x in names(split.results)){
for(x in names(split.results)){
print(x)
result = split.results[[x]]
if(x == names(split.results)[1]){
impute.genes = select.genes
Expand Down Expand Up @@ -172,24 +173,3 @@ impute_knn_global <- function(comb.dat, split.results, select.genes, select.cell
return(list(knn.list =knn.list, org.rd.dat.list = org.rd.dat.list,impute.dat.list=impute.dat.list, ref.list=ref.list))
}


fast_knn <- function(query.dat, ref.dat=query.dat, distance="euclidean", k=15, M=16, ef=200,method="euclidean")
{
library("RcppHNSW")
if(method=="euclidean"){
p = new(HnswL2,ncol(ref.dat), nrow(ref.dat),M, ef)
}
else if(method=="cosine"){
p = new(HnswCosine,ncol(ref.dat), nrow(ref.dat),M, ef)
}
else if(method=="ip"){
p = new(HnswIP,ncol(ref.dat), nrow(ref.dat),M, ef)
}
p$addItems(ref.dat)
knn.result = p$getAllNNsList(query.dat, k=15, include_distance=TRUE)
row.names(knn.result[[1]]) = row.names(knn.result[[2]]) = row.names(query.dat)
return(knn.result)
}



38 changes: 20 additions & 18 deletions R/harmonize_util.R
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ get_de_result <- function(dat.list, de.param.list, cl, select.sets=names(de.pa
#' @export
#'
#' @examples
comb_de_result <- function(de.genes.list, cl.means.list, common.genes=NULL, max.num=10000, pairs=NULL, frac.th=0.7)
comb_de_result <- function(de.genes.list, cl.means.list, common.genes=NULL, max.num=10000, pairs=NULL, frac.th=0.7, mc.cores=1)
{
sets = names(cl.means.list)
if(is.null(common.genes)){
Expand All @@ -375,8 +375,12 @@ comb_de_result <- function(de.genes.list, cl.means.list, common.genes=NULL, max.
if(is.null(pairs)){
pairs = unique(unlist(lapply(de.genes.list, names)))
}
de.genes = list()
for(p in pairs){

require(doMC)
require(foreach)
registerDoMC(cores=mc.cores)

de.genes <- foreach(p = pairs, .combine="c") %dopar% {
de.counts = table(unlist(lapply(names(de.genes.list), function(set){
de = de.genes.list[[set]][[p]]
c(names(de$up.genes),names(de$down.genes))})))
Expand All @@ -396,11 +400,16 @@ comb_de_result <- function(de.genes.list, cl.means.list, common.genes=NULL, max.
next
}
lfc = as.matrix(lfc)


sign = rowSums(lfc > 0)
sign1 = rowSums(lfc > 1)
sign2 = rowSums(lfc < -1)
frac = pmax(sign1, sign2)/ncol(lfc)
lfc = rowMeans(lfc)
g = names(frac)[frac > frac.th]
lfc = lfc[g,drop=F]
frac = frac[g,drop=F]
rank = do.call("cbind",lapply(names(de.genes.list), function(set){
de = de.genes.list[[set]][[p]]

de = de.genes.list[[set]][[p]]
tmp1=match(g, names(de$up.genes))
tmp2=match(g, names(de$down.genes))
tmp1[is.na(tmp1)] = max.num
Expand All @@ -410,22 +419,15 @@ comb_de_result <- function(de.genes.list, cl.means.list, common.genes=NULL, max.
rank[rank > max.num] = max.num
row.names(rank)=g
rank.mean = rowMeans(rank)
sign = rowSums(lfc > 0)
sign1 = rowSums(lfc > 1)
sign2 = rowSums(lfc < -1)
frac = pmax(sign1, sign2)/ncol(lfc)
lfc = rowMeans(lfc)
select.g = names(frac)[frac > frac.th]
df = data.frame(lfc =lfc, frac=frac, counts = as.vector(de.counts[g]), rank.mean=rank.mean)

df = df[select.g,,drop=FALSE]
ord = order(with(df, rank.mean))
df = df[ord,]

df = df[ord,]
up = row.names(df)[which(df$lfc > 0)]
down = row.names(df)[which(df$lfc < 0)]
select = c(up,down)
de.genes[[p]] = list(up.genes =setNames(df[up,"counts"], up), down.genes=setNames(df[down,"counts"], down), up.num = length(up),down.num=length(down),num=length(select),genes=select, de.df = df)
tmp= list(list(up.genes =setNames(df[up,"counts"], up), down.genes=setNames(df[down,"counts"], down), up.num = length(up),down.num=length(down),num=length(select),genes=select,pair=p))
names(tmp) = p
return(tmp)
}
return(de.genes)
}
Expand Down
105 changes: 55 additions & 50 deletions R/util.R
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,14 @@ l2norm <- function(X, by="column")

get_cl_stats <- function(mat,
cl,
stats = c("sums","means","medians","present","sqr_sums","sqr_means"),
stats = c("sums","means","medians","present","sqr_sums","sqr_means"),
low.th=1,
parallel = c(FALSE,TRUE),
mc.cores = 1,...)
{
if(!is.factor(cl)){
cl = as.factor(cl)
}
mc.cores = mc.cores
setThreadOptions(numThreads = mc.cores)
transpose=FALSE
Expand All @@ -724,34 +728,34 @@ get_cl_stats <- function(mat,
if (transpose) {
if (parallel) { #sparse transpose parallel
if (stats == "sums") {
return(rcpp_get_cl_sums_RcppParallel_transpose(mat, cl))
result=rcpp_get_cl_sums_RcppParallel_transpose(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_RcppParallel_transpose(mat, cl))
result=rcpp_get_cl_means_RcppParallel_transpose(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_RcppParallel_transpose(mat, cl))
result=rcpp_get_cl_medians_RcppParallel_transpose(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_RcppParallel_transpose(mat, cl,...))
result=rcpp_get_cl_present_RcppParallel_transpose(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_RcppParallel_transpose(mat, cl))
result=rcpp_get_cl_sqr_means_RcppParallel_transpose(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_RcppParallel_transpose(mat, cl))
result=rcpp_get_cl_sqr_sums_RcppParallel_transpose(mat, cl)
} else {
stop("the stats value is not supported")
}

} else { #sparse transpose non-parallel
if (stats == "sums") {
return(rcpp_get_cl_sums_transpose(mat, cl))
result=rcpp_get_cl_sums_transpose(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_transpose(mat, cl))
result=rcpp_get_cl_means_transpose(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_transpose(mat, cl))
result=rcpp_get_cl_medians_transpose(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_transpose(mat, cl,...))
result=rcpp_get_cl_present_transpose(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_transpose(mat, cl))
result=rcpp_get_cl_sqr_means_transpose(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_transpose(mat, cl))
result = rcpp_get_cl_sqr_sums_transpose(mat, cl)
} else {
stop("the stats value is not supported")
}
Expand All @@ -760,34 +764,34 @@ get_cl_stats <- function(mat,
} else {
if (parallel) { #sparse non-transpose parallel
if (stats == "sums") {
return(rcpp_get_cl_sums_RcppParallel(mat, cl))
result=rcpp_get_cl_sums_RcppParallel(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_RcppParallel(mat, cl))
result=rcpp_get_cl_means_RcppParallel(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_RcppParallel(mat, cl))
result=rcpp_get_cl_medians_RcppParallel(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_RcppParallel(mat, cl,...))
result=rcpp_get_cl_present_RcppParallel(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_RcppParallel(mat, cl))
result=rcpp_get_cl_sqr_means_RcppParallel(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_RcppParallel(mat, cl))
result=rcpp_get_cl_sqr_sums_RcppParallel(mat, cl)
} else {
stop("the stats value is not supported")
}

} else { #sparse non-transpose non-parallel
if (stats == "sums") {
return(rcpp_get_cl_sums(mat, cl))
result=rcpp_get_cl_sums(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means(mat, cl))
result=rcpp_get_cl_means(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians(mat, cl))
result=rcpp_get_cl_medians(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present(mat, cl,...))
result=rcpp_get_cl_present(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means(mat, cl))
result=rcpp_get_cl_sqr_means(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums(mat, cl))
result=rcpp_get_cl_sqr_sums(mat, cl)
} else {
stop("the stats value is not supported")
}
Expand All @@ -798,35 +802,35 @@ get_cl_stats <- function(mat,
if (transpose) {
if (parallel) {#dense transpose parallel
if (stats == "sums") {
return(rcpp_get_cl_sums_RcppParallel_transpose_dense(mat, cl))
result=rcpp_get_cl_sums_RcppParallel_transpose_dense(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_RcppParallel_transpose_dense(mat, cl))
result=rcpp_get_cl_means_RcppParallel_transpose_dense(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_RcppParallel_transpose_dense(mat, cl))
result=rcpp_get_cl_medians_RcppParallel_transpose_dense(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_RcppParallel_transpose_dense(mat, cl,...))
result=rcpp_get_cl_present_RcppParallel_transpose_dense(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_RcppParallel_transpose_dense(mat, cl))
result=rcpp_get_cl_sqr_means_RcppParallel_transpose_dense(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_RcppParallel_transpose_dense(mat, cl))
result=rcpp_get_cl_sqr_sums_RcppParallel_transpose_dense(mat, cl)
} else {
stop("the stats value is not supported")
}

} else { #dense transpose non-parallel

if (stats == "sums") {
return(rcpp_get_cl_sums_transpose_dense(mat, cl))
result=rcpp_get_cl_sums_transpose_dense(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_transpose_dense(mat, cl))
result=rcpp_get_cl_means_transpose_dense(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_transpose_dense(mat, cl))
result=rcpp_get_cl_medians_transpose_dense(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_transpose_dense(mat, cl,...))
result=rcpp_get_cl_present_transpose_dense(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_transpose_dense(mat, cl))
result=rcpp_get_cl_sqr_means_transpose_dense(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_transpose_dense(mat, cl))
result=rcpp_get_cl_sqr_sums_transpose_dense(mat, cl)
} else {
stop("the stats value is not supported")
}
Expand All @@ -835,40 +839,41 @@ get_cl_stats <- function(mat,
if (parallel) { #dense non-transpose parallel

if (stats == "sums") {
return(rcpp_get_cl_sums_RcppParallel_dense(mat, cl))
result=rcpp_get_cl_sums_RcppParallel_dense(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_RcppParallel_dense(mat, cl))
result=rcpp_get_cl_means_RcppParallel_dense(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_RcppParallel_dense(mat, cl))
result=rcpp_get_cl_medians_RcppParallel_dense(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_RcppParallel_dense(mat, cl,...))
result=rcpp_get_cl_present_RcppParallel_dense(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_RcppParallel_dense(mat, cl))
result=rcpp_get_cl_sqr_means_RcppParallel_dense(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_RcppParallel_dense(mat, cl))
result=rcpp_get_cl_sqr_sums_RcppParallel_dense(mat, cl)
} else {
stop("the stats value is not supported")
}
} else { #dense non-transpose non-parallel

if (stats == "sums") {
return(rcpp_get_cl_sums_dense(mat, cl))
result=rcpp_get_cl_sums_dense(mat, cl)
} else if (stats == "means") {
return(rcpp_get_cl_means_dense(mat, cl))
result=rcpp_get_cl_means_dense(mat, cl)
} else if (stats == "medians") {
return(rcpp_get_cl_medians_dense(mat, cl))
result=rcpp_get_cl_medians_dense(mat, cl)
} else if (stats == "present") {
return(rcpp_get_cl_present_dense(mat, cl,...))
result=rcpp_get_cl_present_dense(mat, cl,lowth=low.th)
} else if (stats == "sqr_means") {
return(rcpp_get_cl_sqr_means_dense(mat, cl))
result=rcpp_get_cl_sqr_means_dense(mat, cl)
} else if (stats == "sqr_sums") {
return(rcpp_get_cl_sqr_sums_dense(mat, cl))
result=rcpp_get_cl_sqr_sums_dense(mat, cl)
} else {
stop("the stats value is not supported")
}
}
}
}
}
result = result[,levels(cl),drop=F]
}


Expand Down

0 comments on commit 217418e

Please sign in to comment.