Skip to content

Commit

Permalink
Merge pull request #203 from LCSB-BioCore/mk-dida
Browse files Browse the repository at this point in the history
use distributed processing from DiDa
  • Loading branch information
exaexa authored Mar 29, 2022
2 parents b784d10 + 567ceff commit d44213f
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 1,336 deletions.
16 changes: 10 additions & 6 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
name = "GigaSOM"
uuid = "a03a9c34-069e-5582-a11c-5c984cab887c"
version = "0.6.8"
version = "0.7.0"

[deps]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
DistributedArrays = "aaf54ef3-cdf8-58ed-94cc-d582ad619b94"
DistributedData = "f6a0035f-c5ac-4ad0-b410-ad102ced35df"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
FCSFiles = "d76558cf-badf-52d4-a17e-381ab0b0d937"
FileIO = "5789e2e9-d7fb-5bc7-8068-2c6fae9b9549"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0"

[compat]
CSV = "^0.5.12, 0.6, 0.7, 0.8, 0.10"
DataFrames = "0.20, 0.21, 0.22, 1.0"
Distances = "^0.8.2, 0.9, 0.10"
DistributedArrays = "^0.6.4"
DistributedData = "0.1.4, 0.2"
Distributions = "^0.21.1, 0.22, 0.23, 0.24, 0.25"
FCSFiles = "^0.1.1"
FileIO = "^1.0.7"
Expand All @@ -36,4 +33,11 @@ XLSX = "^0.5.5, 0.6, 0.7"
julia = "1"

[extras]
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0"

[targets]
test = ["JSON", "LinearAlgebra", "SHA", "Test", "XLSX"]
7 changes: 0 additions & 7 deletions docs/src/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,3 @@ Pages = ["core.jl", "trainutils.jl"]
Modules = [GigaSOM]
Pages = ["embedding.jl"]
```

## Distributed processing tools

```@autodocs
Modules = [GigaSOM]
Pages = ["distributed.jl", "dio.jl"]
```
15 changes: 7 additions & 8 deletions docs/src/tutorials/basicUsage.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,17 @@ While all functions work well on simple data matrices, the main aim of GigaSOM
is to let the users enjoy the cluster-computing resources. All functions also
work on data that are "scattered" among workers (i.e. each worker only holds a
portion of the data loaded in the memory). This dataset description is stored
in [`LoadedDataInfo`](@ref) structure. Most of the functions above in fact
accept the `LoadedDataInfo` as argument, and often return another
`LoadedDataInfo` that describes the scattered result.
in [`Dinfo`](@ref) structure. Most of the functions above in fact
accept the `Dinfo` as argument, and often return another
`Dinfo` that describes the scattered result.

Most importantly, using `LoadedDataInfo` **prevents memory exhaustion at the
Most importantly, using `Dinfo` **prevents memory exhaustion at the
master node**, which is a critical feature required to handle huge datasets.

You can always collect the scattered data back into a matrix (if it fits to
your RAM) with [`distributed_collect`](@ref), and utilize many other functions
to manipulate it, including e.g. [`distributed_mapreduce`](@ref) for easily
running parallel computations, or [`distributed_export`](@ref) for saving and
restoring the dataset paralelly.
your RAM) with `gather_array`, and utilize many other functions to manipulate
it, including e.g. `dmapreduce` for easily running parallel computations, or
`dstore` for saving and restoring the dataset paralelly.

## Minimal working example

Expand Down
22 changes: 12 additions & 10 deletions docs/src/tutorials/distributedProcessing.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,31 @@ aforementioned `dselect` and `dtransform_asinh`. The following code extracts
means and standard deviations from the first 3 columns of a dataset distributed
as `di`:
```julia
using DistributedData

dstat(di, [1,2,3])
```

## Manual work with the distributed data
## Manual work with the DistributedData.jl package

We will first show how to use the general framework to compute per-cluster
statistics. GigaSOM exports the [`distributed_mapreduce`](@ref) function that
statistics. DistributedData.jl exports the [`dmapreduce`](@ref) function that
can be used as a very effective basic building block for running such
computations. For example, you can efficiently compute a distributed mean of
all your data as such:
```julia
distributed_mapreduce(di, sum, +) / distributed_mapreduce(di, length, +)
dmapreduce(di, sum, +) / dmapreduce(di, length, +)
```

The parameters of `distributed_mapreduce` are, in order:
The parameters of `dmapreduce` are, in order:

- `di`, the dataset
- `sum` or `length`, an unary "map" function -- during the computation, each
piece of distributed data is first _paralelly_ processed by this function
- `+`, a binary "reduction" or "folding" function -- the pieces of information
processed by the map function are successively joined in pairs using this
function, until there is only a single result left. This final result is also
what `distributed_mapreduce` returns.
what `dmapreduce` returns.

Above example thus reads: Sum all data on all workers, add up the intermediate
results, and divide the final number to the sum of all lengths of data on the
Expand All @@ -45,7 +47,7 @@ Column-wise mean (as produced by `dstat`) is slightly more useful; we only need
to split the computation on columns:

```julia
distributed_mapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ distributed_mapreduce(di, x->size(x,1), +)
dmapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ dmapreduce(di, x->size(x,1), +)
```

Finally, for distributed computation of per-cluster mean, the clustering
Expand All @@ -55,7 +57,7 @@ the distributed `mapToGigaSOM` does exactly that).
First, compute the clustering:
```julia
mapping = mapToGigaSOM(som, di)
distributed_transform(mapping, m -> metaClusters[m])
dtransform(mapping, m -> metaClusters[m])
```

Now, the distributed computation is run on 2 scattered datasets. We employ a
Expand All @@ -66,7 +68,7 @@ following code produces a matrix of tuples `(sum, count)`, for separate
clusters (in rows) and data columns (in columns):

```julia
sumscounts = distributed_mapreduce([di, mapping],
sumscounts = dmapreduce([di, mapping],
(d, mapping) -> catmapbuckets(
(_,clData) -> (sum(clData), length(clData)),
d, 10, mapping),
Expand Down Expand Up @@ -113,13 +115,13 @@ capture all of the actual 16 existing clusters)

Finally, we can remove the temporary data from workers to create free memory for other analyses:
```julia
undistribute(mapping)
unscatter(mapping)
```

## Convenience statistical functions

Notably, several of the most used statistical functions are available in
GigaSOM.jl in a form that can cope with distributed data.
DistributedData.jl in a form that can cope with distributed data.

For example, you can run a distributed median computation as such:
```julia
Expand Down
6 changes: 3 additions & 3 deletions docs/src/tutorials/processingFCSData.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ If you are sure you have enough RAM, you can collect the data to the master
node. (In case of the relatively small Levine13 dataset, you very probably have
the required 2.5MB of RAM, but there are many larger datasets.)
```julia
e = distributed_collect(e)
e = gather_array(e)
```

```
Expand Down Expand Up @@ -260,13 +260,13 @@ The `metaClusters` represent membership of the SOM codes in cluster; these can
be expanded to membership of all cells using [`mapToGigaSOM`](@ref):

```julia
mapping = distributed_collect(mapToGigaSOM(som, di), free=true)
mapping = gather_array(mapToGigaSOM(som, di), free=true)
clusters = metaClusters[mapping]
```

`clusters` now contain an integer from `1` to `10` with a classification of
each cell in the dataset.

(The argument `free=true` of `distributed_collect` automatically removes the
(The argument `free=true` of `gather_array` automatically removes the
distributed data from workers after collecting, which saves their memory for
other datasets.)
43 changes: 3 additions & 40 deletions src/GigaSOM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ using CSV
using DataFrames
using Distances
using Distributed
using DistributedData
using Distributions
using FCSFiles
using FileIO
Expand All @@ -21,20 +22,15 @@ using StableRNGs
include("base/structs.jl")

include("base/dataops.jl")
include("base/distributed.jl")
include("base/trainutils.jl")

include("analysis/core.jl")
include("analysis/embedding.jl")

include("io/dio.jl")
include("io/input.jl")
include("io/process.jl")
include("io/splitting.jl")

# include visualization files
# include("visualization/plotting.jl")

#core
export initGigaSOM, trainGigaSOM, mapToGigaSOM

Expand All @@ -45,7 +41,7 @@ export linearRadius, expRadius, gaussianKernel, bubbleKernel, thresholdKernel, d
export embedGigaSOM

# structs
export Som, LoadedDataInfo
export Som

#io/input
export readFlowset,
Expand All @@ -69,40 +65,7 @@ export slicesof, vcollectSlice, collectSlice
#io/process
export cleanNames!, getMetaData, getMarkerNames

# plotting
export plotCounts, plotPCA

#dataops (higher-level operations on data)
export dcopy,
dselect,
dapply_cols,
dapply_rows,
dstat,
dstat_buckets,
dcount,
dcount_buckets,
dscale,
dtransform_asinh,
dmedian,
dmedian_buckets,
mapbuckets,
catmapbuckets

#distributed data tools
export save_at,
get_from,
get_val_from,
remove_from,
distribute_array,
distribute_darray,
undistribute,
distributed_transform,
distributed_mapreduce,
distributed_foreach,
distributed_collect,
distributed_export,
distributed_import,
distributed_unlink

export dtransform_asinh

end # module
Loading

0 comments on commit d44213f

Please sign in to comment.