From 7eed94aae761443a95e2cf2ca6337d57db8c8a42 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 18 Feb 2022 00:19:02 +0000 Subject: [PATCH 1/6] CompatHelper: bump compat for "CSV" to "0.10" --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 95d1b6b2..61e91f40 100644 --- a/Project.toml +++ b/Project.toml @@ -22,7 +22,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0" [compat] -CSV = "^0.5.12, 0.6, 0.7, 0.8" +CSV = "^0.5.12, 0.6, 0.7, 0.8, 0.10" DataFrames = "0.20, 0.21, 0.22, 1.0" Distances = "^0.8.2, 0.9, 0.10" DistributedArrays = "^0.6.4" From 612496fac455554339fe54a22b081c2e8b44b743 Mon Sep 17 00:00:00 2001 From: Mirek Kratochvil Date: Sat, 19 Mar 2022 14:19:57 +0100 Subject: [PATCH 2/6] migrate to DistributedData --- Project.toml | 16 +- docs/src/tutorials/basicUsage.md | 15 +- docs/src/tutorials/distributedProcessing.md | 22 +- src/GigaSOM.jl | 37 +- src/analysis/core.jl | 52 +-- src/analysis/embedding.jl | 24 +- src/base/dataops.jl | 411 +------------------- src/base/distributed.jl | 355 ----------------- src/base/structs.jl | 17 - src/io/dio.jl | 94 ----- src/io/input.jl | 28 +- test/runtests.jl | 3 +- test/testBatch.jl | 2 +- test/testDataOps.jl | 116 +----- test/testDistributed.jl | 125 ------ test/testFileSplitting.jl | 4 +- test/testInputCSV.jl | 4 +- test/testLoadPBMC8.jl | 8 +- 18 files changed, 105 insertions(+), 1228 deletions(-) delete mode 100644 src/base/distributed.jl delete mode 100644 src/io/dio.jl delete mode 100644 test/testDistributed.jl diff --git a/Project.toml b/Project.toml index 61e91f40..2d791240 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "GigaSOM" uuid = "a03a9c34-069e-5582-a11c-5c984cab887c" -version = "0.6.8" +version = "0.7.0" [deps] CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" @@ -8,24 +8,21 @@ DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" DistributedArrays = "aaf54ef3-cdf8-58ed-94cc-d582ad619b94" +DistributedData = "f6a0035f-c5ac-4ad0-b410-ad102ced35df" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" FCSFiles = "d76558cf-badf-52d4-a17e-381ab0b0d937" FileIO = "5789e2e9-d7fb-5bc7-8068-2c6fae9b9549" -JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" -LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" -SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" -Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" -XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0" [compat] CSV = "^0.5.12, 0.6, 0.7, 0.8, 0.10" DataFrames = "0.20, 0.21, 0.22, 1.0" Distances = "^0.8.2, 0.9, 0.10" DistributedArrays = "^0.6.4" +DistributedData = "0.2" Distributions = "^0.21.1, 0.22, 0.23, 0.24, 0.25" FCSFiles = "^0.1.1" FileIO = "^1.0.7" @@ -36,4 +33,11 @@ XLSX = "^0.5.5, 0.6, 0.7" julia = "1" [extras] +JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0" + +[targets] +test = ["JSON", "LinearAlgebra", "SHA", "Test", "XLSX"] diff --git a/docs/src/tutorials/basicUsage.md b/docs/src/tutorials/basicUsage.md index 11b1d6b5..888da529 100644 --- a/docs/src/tutorials/basicUsage.md +++ b/docs/src/tutorials/basicUsage.md @@ -33,18 +33,17 @@ While all functions work well on simple data matrices, the main aim of GigaSOM is to let the users enjoy the cluster-computing resources. All functions also work on data that are "scattered" among workers (i.e. each worker only holds a portion of the data loaded in the memory). This dataset description is stored -in [`LoadedDataInfo`](@ref) structure. Most of the functions above in fact -accept the `LoadedDataInfo` as argument, and often return another -`LoadedDataInfo` that describes the scattered result. +in [`Dinfo`](@ref) structure. Most of the functions above in fact +accept the `Dinfo` as argument, and often return another +`Dinfo` that describes the scattered result. -Most importantly, using `LoadedDataInfo` **prevents memory exhaustion at the +Most importantly, using `Dinfo` **prevents memory exhaustion at the master node**, which is a critical feature required to handle huge datasets. You can always collect the scattered data back into a matrix (if it fits to -your RAM) with [`distributed_collect`](@ref), and utilize many other functions -to manipulate it, including e.g. [`distributed_mapreduce`](@ref) for easily -running parallel computations, or [`distributed_export`](@ref) for saving and -restoring the dataset paralelly. +your RAM) with `gather_array`, and utilize many other functions to manipulate +it, including e.g. `dmapreduce` for easily running parallel computations, or +`dstore` for saving and restoring the dataset paralelly. ## Minimal working example diff --git a/docs/src/tutorials/distributedProcessing.md b/docs/src/tutorials/distributedProcessing.md index 220d46b4..4a0c6aea 100644 --- a/docs/src/tutorials/distributedProcessing.md +++ b/docs/src/tutorials/distributedProcessing.md @@ -13,21 +13,23 @@ aforementioned `dselect` and `dtransform_asinh`. The following code extracts means and standard deviations from the first 3 columns of a dataset distributed as `di`: ```julia +using DistributedData + dstat(di, [1,2,3]) ``` -## Manual work with the distributed data +## Manual work with the DistributedData.jl package We will first show how to use the general framework to compute per-cluster -statistics. GigaSOM exports the [`distributed_mapreduce`](@ref) function that +statistics. DistributedData.jl exports the [`dmapreduce`](@ref) function that can be used as a very effective basic building block for running such computations. For example, you can efficiently compute a distributed mean of all your data as such: ```julia -distributed_mapreduce(di, sum, +) / distributed_mapreduce(di, length, +) +dmapreduce(di, sum, +) / dmapreduce(di, length, +) ``` -The parameters of `distributed_mapreduce` are, in order: +The parameters of `dmapreduce` are, in order: - `di`, the dataset - `sum` or `length`, an unary "map" function -- during the computation, each @@ -35,7 +37,7 @@ The parameters of `distributed_mapreduce` are, in order: - `+`, a binary "reduction" or "folding" function -- the pieces of information processed by the map function are successively joined in pairs using this function, until there is only a single result left. This final result is also - what `distributed_mapreduce` returns. + what `dmapreduce` returns. Above example thus reads: Sum all data on all workers, add up the intermediate results, and divide the final number to the sum of all lengths of data on the @@ -45,7 +47,7 @@ Column-wise mean (as produced by `dstat`) is slightly more useful; we only need to split the computation on columns: ```julia -distributed_mapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ distributed_mapreduce(di, x->size(x,1), +) +dmapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ dmapreduce(di, x->size(x,1), +) ``` Finally, for distributed computation of per-cluster mean, the clustering @@ -55,7 +57,7 @@ the distributed `mapToGigaSOM` does exactly that). First, compute the clustering: ```julia mapping = mapToGigaSOM(som, di) -distributed_transform(mapping, m -> metaClusters[m]) +dtransform(mapping, m -> metaClusters[m]) ``` Now, the distributed computation is run on 2 scattered datasets. We employ a @@ -66,7 +68,7 @@ following code produces a matrix of tuples `(sum, count)`, for separate clusters (in rows) and data columns (in columns): ```julia -sumscounts = distributed_mapreduce([di, mapping], +sumscounts = dmapreduce([di, mapping], (d, mapping) -> catmapbuckets( (_,clData) -> (sum(clData), length(clData)), d, 10, mapping), @@ -113,13 +115,13 @@ capture all of the actual 16 existing clusters) Finally, we can remove the temporary data from workers to create free memory for other analyses: ```julia -undistribute(mapping) +unscatter(mapping) ``` ## Convenience statistical functions Notably, several of the most used statistical functions are available in -GigaSOM.jl in a form that can cope with distributed data. +DistributedData.jl in a form that can cope with distributed data. For example, you can run a distributed median computation as such: ```julia diff --git a/src/GigaSOM.jl b/src/GigaSOM.jl index d399d906..7b5bd7a1 100644 --- a/src/GigaSOM.jl +++ b/src/GigaSOM.jl @@ -10,6 +10,7 @@ using CSV using DataFrames using Distances using Distributed +using DistributedData using Distributions using FCSFiles using FileIO @@ -21,13 +22,11 @@ using StableRNGs include("base/structs.jl") include("base/dataops.jl") -include("base/distributed.jl") include("base/trainutils.jl") include("analysis/core.jl") include("analysis/embedding.jl") -include("io/dio.jl") include("io/input.jl") include("io/process.jl") include("io/splitting.jl") @@ -45,7 +44,7 @@ export linearRadius, expRadius, gaussianKernel, bubbleKernel, thresholdKernel, d export embedGigaSOM # structs -export Som, LoadedDataInfo +export Som #io/input export readFlowset, @@ -73,36 +72,6 @@ export cleanNames!, getMetaData, getMarkerNames export plotCounts, plotPCA #dataops (higher-level operations on data) -export dcopy, - dselect, - dapply_cols, - dapply_rows, - dstat, - dstat_buckets, - dcount, - dcount_buckets, - dscale, - dtransform_asinh, - dmedian, - dmedian_buckets, - mapbuckets, - catmapbuckets - -#distributed data tools -export save_at, - get_from, - get_val_from, - remove_from, - distribute_array, - distribute_darray, - undistribute, - distributed_transform, - distributed_mapreduce, - distributed_foreach, - distributed_collect, - distributed_export, - distributed_import, - distributed_unlink - +export dtransform_asinh end # module diff --git a/src/analysis/core.jl b/src/analysis/core.jl index 37634ee2..13c06b41 100644 --- a/src/analysis/core.jl +++ b/src/analysis/core.jl @@ -21,18 +21,18 @@ function initGigaSOM(data::Union{Matrix,DataFrame}, args...; kwargs...) end """ - function initGigaSOM(data::LoadedDataInfo, + function initGigaSOM(data::Dinfo, xdim::Int64, ydim::Int64 = xdim; seed=rand(Int), rng=StableRNG(seed)) -`initGigaSOM` overload for working with distributed-style `LoadedDataInfo` +`initGigaSOM` overload for working with distributed-style `Dinfo` data. The rest of the arguments is passed to the data-independent `initGigaSOM`. Arguments: -- `data`: a `LoadedDataInfo` object with the distributed dataset matrix +- `data`: a `Dinfo` object with the distributed dataset matrix """ -function initGigaSOM(data::LoadedDataInfo, args...; kwargs...) +function initGigaSOM(data::Dinfo, args...; kwargs...) ncol = get_val_from(data.workers[1], :(size($(data.val))[2])) (means, sdevs) = dstat(data, Vector(1:ncol)) @@ -84,7 +84,7 @@ end """ trainGigaSOM( som::Som, - dInfo::LoadedDataInfo; + dInfo::Dinfo; kernelFun::Function = gaussianKernel, metric = Euclidean(), somDistFun = distMatrix(Chebyshev()), @@ -98,7 +98,7 @@ end # Arguments: - `som`: object of type Som with an initialised som -- `dInfo`: `LoadedDataInfo` object that describes a loaded dataset +- `dInfo`: `Dinfo` object that describes a loaded dataset - `kernelFun::function`: optional distance kernel; one of (`bubbleKernel, gaussianKernel`) default is `gaussianKernel` - `metric`: Passed as metric argument to the KNN-tree constructor @@ -114,7 +114,7 @@ end """ function trainGigaSOM( som::Som, - dInfo::LoadedDataInfo; + dInfo::Dinfo; kernelFun::Function = gaussianKernel, metric = Euclidean(), somDistFun = distMatrix(Chebyshev()), @@ -171,16 +171,16 @@ end Overload of `trainGigaSOM` for simple DataFrames and matrices. This slices the data, distributes them to the workers, and runs normal `trainGigaSOM`. Data is -`undistribute`d after the computation. +`unscatter`d after the computation. """ function trainGigaSOM(som::Som, train; kwargs...) train = Matrix{Float64}(train) #this slices the data into parts and and sends them to workers - dInfo = distribute_array(:GigaSOMtrainDataVar, train, workers()) + dInfo = scatter_array(:GigaSOMtrainDataVar, train, workers()) som_res = trainGigaSOM(som, dInfo; kwargs...) - undistribute(dInfo) + unscatter(dInfo) return som_res end @@ -215,13 +215,13 @@ function doEpoch(x::Array{Float64,2}, codes::Array{Float64,2}, tree) end """ - distributedEpoch(dInfo::LoadedDataInfo, codes::Matrix{Float64}, tree) + distributedEpoch(dInfo::Dinfo, codes::Matrix{Float64}, tree) Execute the `doEpoch` in parallel on workers described by `dInfo` and collect the results. Returns pair of numerator and denominator matrices. """ -function distributedEpoch(dInfo::LoadedDataInfo, codes::Matrix{Float64}, tree) - return distributed_mapreduce( +function distributedEpoch(dInfo::Dinfo, codes::Matrix{Float64}, tree) + return dmapreduce( dInfo, (data) -> doEpoch(data, codes, tree), ((n1, d1), (n2, d2)) -> (n1 + n2, d1 + d2), @@ -230,33 +230,33 @@ end """ - mapToGigaSOM(som::Som, dInfo::LoadedDataInfo; + mapToGigaSOM(som::Som, dInfo::Dinfo; knnTreeFun = BruteTree, metric = Euclidean(), - output::Symbol=tmpSym(dInfo)::LoadedDataInfo + output::Symbol=tmp_symbol(dInfo)::Dinfo Compute the index of the BMU for each row of the input data. # Arguments - `som`: a trained SOM -- `dInfo`: `LoadedDataInfo` that describes the loaded and distributed data +- `dInfo`: `Dinfo` that describes the loaded and distributed data - `knnTreeFun`: Constructor of the KNN-tree (e.g. from NearestNeighbors package) - `metric`: Passed as metric argument to the KNN-tree constructor -- `output`: Symbol to save the result, defaults to `tmpSym(dInfo)` +- `output`: Symbol to save the result, defaults to `tmp_symbol(dInfo)` Data must have the same number of dimensions as the training dataset and will be normalised with the same parameters. """ function mapToGigaSOM( som::Som, - dInfo::LoadedDataInfo; + dInfo::Dinfo; knnTreeFun = BruteTree, metric = Euclidean(), - output::Symbol = tmpSym(dInfo), -)::LoadedDataInfo + output::Symbol = tmp_symbol(dInfo), +)::Dinfo tree = knnTreeFun(Array{Float64,2}(transpose(som.codes)), metric) - return distributed_transform( + return dtransform( dInfo, (d) -> (vcat(knn(tree, transpose(d), 1)[1]...)), output, @@ -269,7 +269,7 @@ end metric = Euclidean()) Overload of `mapToGigaSOM` for simple DataFrames and matrices. This slices the data using `DistributedArrays`, sends them the workers, and runs normal -`mapToGigaSOM`. Data is `undistribute`d after the computation. +`mapToGigaSOM`. Data is `unscatter`d after the computation. """ function mapToGigaSOM(som::Som, data; knnTreeFun = BruteTree, metric = Euclidean()) @@ -280,11 +280,11 @@ function mapToGigaSOM(som::Som, data; knnTreeFun = BruteTree, metric = Euclidean error("Data dimensions do not match") end - dInfo = distribute_array(:GigaSOMmappingDataVar, data, workers()) + dInfo = scatter_array(:GigaSOMmappingDataVar, data, workers()) rInfo = mapToGigaSOM(som, dInfo, knnTreeFun = knnTreeFun, metric = metric) - res = distributed_collect(rInfo) - undistribute(dInfo) - undistribute(rInfo) + res = gather_array(rInfo) + unscatter(dInfo) + unscatter(rInfo) return DataFrame(index = res) end diff --git a/src/analysis/embedding.jl b/src/analysis/embedding.jl index e8166f33..28d35e2e 100644 --- a/src/analysis/embedding.jl +++ b/src/analysis/embedding.jl @@ -1,19 +1,19 @@ """ embedGigaSOM(som::GigaSOM.Som, - dInfo::LoadedDataInfo; + dInfo::Dinfo; knnTreeFun = BruteTree, metric = Euclidean(), k::Int64=0, adjust::Float64=1.0, smooth::Float64=0.0, m::Float64=10.0, - output::Symbol=tmpSym(dInfo))::LoadedDataInfo + output::Symbol=tmp_symbol(dInfo))::Dinfo Return a data frame with X,Y coordinates of EmbedSOM projection of the data. # Arguments: - `som`: a trained SOM -- `dInfo`: `LoadedDataInfo` that describes the loaded dataset +- `dInfo`: `Dinfo` that describes the loaded dataset - `knnTreeFun`: Constructor of the KNN-tree (e.g. from NearestNeighbors package) - `metric`: Passed as metric argument to the KNN-tree constructor - `k`: number of nearest neighbors to consider (high values get quadratically @@ -30,15 +30,15 @@ and must be normalized using the same parameters. """ function embedGigaSOM( som::GigaSOM.Som, - dInfo::LoadedDataInfo; + dInfo::Dinfo; knnTreeFun = BruteTree, metric = Euclidean(), k::Int64 = 0, adjust::Float64 = 1.0, smooth::Float64 = 0.0, m::Float64 = 10.0, - output::Symbol = tmpSym(dInfo), -)::LoadedDataInfo + output::Symbol = tmp_symbol(dInfo), +)::Dinfo # convert `smooth` to `boost` boost = exp(-smooth - 1) @@ -58,7 +58,7 @@ function embedGigaSOM( tree = knnTreeFun(Array{Float64,2}(transpose(som.codes)), metric) # run the distributed computation - return distributed_transform( + return dtransform( dInfo, (d) -> (embedGigaSOM_internal(som, d, tree, k, adjust, boost, m)), output, @@ -77,7 +77,7 @@ end Overload of `embedGigaSOM` for simple DataFrames and matrices. This slices the data using `DistributedArrays`, sends them the workers, and runs normal -`embedGigaSOM`. All data is properly `undistribute`d after the computation. +`embedGigaSOM`. All data is properly `unscatter`d after the computation. # Examples: @@ -107,7 +107,7 @@ function embedGigaSOM( data = Matrix{Float64}(data) - dInfo = distribute_array(:GigaSOMembeddingDataVar, data, workers()) + dInfo = scatter_array(:GigaSOMembeddingDataVar, data, workers()) rInfo = embedGigaSOM( som, dInfo, @@ -118,9 +118,9 @@ function embedGigaSOM( smooth = smooth, m = m, ) - res = distributed_collect(rInfo) - undistribute(dInfo) - undistribute(rInfo) + res = gather_array(rInfo) + unscatter(dInfo) + unscatter(rInfo) return res end diff --git a/src/base/dataops.jl b/src/base/dataops.jl index b056f563..2919eb71 100644 --- a/src/base/dataops.jl +++ b/src/base/dataops.jl @@ -1,416 +1,9 @@ - -""" - dcopy(dInfo::LoadedDataInfo, newName::Symbol) - -Clone the dataset and store it under a new distributed name `newName`. -""" -function dcopy(dInfo::LoadedDataInfo, newName::Symbol)::LoadedDataInfo - distributed_transform(dInfo, x -> x, newName) -end - -""" - dselect(dInfo::LoadedDataInfo, columns::Vector{Int}; tgt=dInfo.val) - -Reduce dataset to selected columns, optionally save it under a different name. -""" -function dselect( - dInfo::LoadedDataInfo, - columns::Vector{Int}, - tgt::Symbol = dInfo.val, -)::LoadedDataInfo - distributed_transform(dInfo, mtx -> mtx[:, columns], tgt) -end - -""" - function dselect(dInfo::LoadedDataInfo, - currentColnames::Vector{String}, selectColnames::Vector{String}; - tgt=dInfo.val)::LoadedDataInfo - -Convenience overload of `dselect` that works with column names. -""" -function dselect( - dInfo::LoadedDataInfo, - currentColnames::Vector{String}, - selectColnames::Vector{String}, - tgt = dInfo.val, -)::LoadedDataInfo - colIdxs = indexin(selectColnames, currentColnames) - if any(colIdxs .== nothing) - @error "Some columns were not found" - error("unknown column") - end - dselect(dInfo, Vector{Int}(colIdxs), tgt) -end - -""" - dapply_cols(dInfo::LoadedDataInfo, fn, columns::Vector{Int}) - -Apply a function `fn` over columns of a distributed dataset. - -`fn` gets _2_ parameters: -- a data vector for (the whole column saved at one worker) -- index of the column in the `columns` array (i.e. a number from - `1:length(columns)`) -""" -function dapply_cols(dInfo::LoadedDataInfo, fn, columns::Vector{Int}) - transform_columns = x -> for (idx, c) in enumerate(columns) - x[:, c] = fn(x[:, c], idx) - end - - distributed_exec(dInfo, transform_columns) -end - -""" - dapply_rows(dInfo::LoadedDataInfo, fn) - -Apply a function `fn` over rows of a distributed dataset. - -`fn` gets a single vector parameter for each row to transform. -""" -function dapply_rows(dInfo::LoadedDataInfo, fn) - transform_rows = x -> for i = 1:size(x, 1) - x[i, :] = fn(x[i, :]) - end - - distributed_exec(dInfo, transform_rows) -end - -""" - combine_stats((s1, sqs1, n1), (s2, sqs2, n2)) - -Helper for `dstat`-style functions that just adds up elements in triplets of -vectors. -""" -function combine_stats((s1, sqs1, n1), (s2, sqs2, n2)) - return (s1 .+ s2, sqs1 .+ sqs2, n1 .+ n2) -end - -""" - dstat(dInfo::LoadedDataInfo, columns::Vector{Int})::Tuple{Vector{Float64}, Vector{Float64}} - -Compute mean and standard deviation of the columns in dataset. Returns a tuple -with a vector of means in `columns`, and a vector of corresponding sdevs. -""" -function dstat( - dInfo::LoadedDataInfo, - columns::Vector{Int}, -)::Tuple{Vector{Float64},Vector{Float64}} - - sum_squares = x -> sum(x .^ 2) - - # extraction of the statistics from individual dataset slices - get_stats = - d -> ( - mapslices(sum, d[:, columns], dims = 1), - mapslices(sum_squares, d[:, columns], dims = 1), - mapslices(length, d[:, columns], dims = 1), - ) - - # extract the stats - (sums, sqsums, ns) = distributed_mapreduce(dInfo, get_stats, combine_stats) - - return ( - (sums./ns)[1, :], #means - (sqrt.(sqsums ./ ns - (sums ./ ns) .^ 2))[1, :], #sdevs - ) -end - -""" - dstat_buckets(dInfo::LoadedDataInfo, nbuckets::Int, buckets::LoadedDataInfo, columns::Vector{Int})::Tuple{Matrix{Float64}, Matrix{Float64}} - -A version of `dstat` that works with bucketing information (e.g. clusters); -returns a tuple of matrices. -""" -function dstat_buckets( - dInfo::LoadedDataInfo, - nbuckets::Int, - buckets::LoadedDataInfo, - columns::Vector{Int}, -)::Tuple{Matrix{Float64},Matrix{Float64}} - # this produces a triplet of matrices (1 row per each bucket) - get_bucketed_stats = - (d, b) -> ( - catmapbuckets((_, x) -> sum(x), d[:, columns], nbuckets, b), # sums - catmapbuckets((_, x) -> sum(x .^ 2), d[:, columns], nbuckets, b), # squared sums - catmapbuckets((_, x) -> length(x), d[:, columns], nbuckets, b), # data sizes - ) - - # extract the bucketed stats - (sums, sqsums, ns) = - distributed_mapreduce([dInfo, buckets], get_bucketed_stats, combine_stats) - - return ( - sums ./ ns, #means - sqrt.(sqsums ./ ns - (sums ./ ns) .^ 2), #sdevs - ) -end - -""" - dcount(ncats::Int, dInfo::LoadedDataInfo)::Vector{Int} - -Count the numbers of integer vector values stored in `dInfo`; assuming the -values are in range 1--`ncats`. -""" -function dcount(ncats::Int, dInfo::LoadedDataInfo)::Vector{Int} - tabulate = (d) -> begin - counts = zeros(Int, ncats) - for i in d - counts[i] += 1 - end - return counts - end - - distributed_mapreduce(dInfo, tabulate, +) -end - -""" - dcount_buckets(ncats::Int, dInfo::LoadedDataInfo, nbuckets::Int, buckets::LoadedDataInfo)::Matrix{Int} - -Same as `dcount`, but counts the items in `dInfo` bucketed by `buckets` to -produce a matrix of counts, with `ncats` rows and `nbuckets` columns. - -Useful with `distributeFCSFileVector` to determine cluster distribution within -files. """ -function dcount_buckets( - ncats::Int, - dInfo::LoadedDataInfo, - nbuckets::Int, - buckets::LoadedDataInfo, -)::Matrix{Int} - tabulate2 = (d, b) -> begin - counts = zeros(Int, ncats, nbuckets) - for (i, j) in zip(d, b) - counts[i, j] += 1 - end - return counts - end - - distributed_mapreduce([dInfo, buckets], tabulate2, +) -end - -""" - dscale(dInfo::LoadedDataInfo, columns::Vector{Int}) - -Scale the columns in the dataset to have mean 0 and sdev 1. - -Prevents creation of NaNs by avoiding division by zero sdevs. -""" -function dscale(dInfo::LoadedDataInfo, columns::Vector{Int}) - mean, sd = dstat(dInfo, columns) - sd[sd.==0] .= 1 # prevent division by zero - - normalize = (coldata, idx) -> (coldata .- mean[idx]) ./ sd[idx] - - dapply_cols(dInfo, normalize, columns) -end - -""" - dtransform_asinh(dInfo::LoadedDataInfo, columns::Vector{Int}, cofactor=5) + dtransform_asinh(dInfo::Dinfo, columns::Vector{Int}, cofactor=5) Transform columns of the dataset by asinh transformation with `cofactor`. """ -function dtransform_asinh(dInfo::LoadedDataInfo, columns::Vector{Int}, cofactor = 5) +function dtransform_asinh(dInfo::Dinfo, columns::Vector{Int}, cofactor = 5) dapply_cols(dInfo, (v, _) -> asinh.(v ./ cofactor), columns) end -""" - mapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1, slicedims=bucketdim) - -Apply the function `fn` over array `a` so that it processes the data by -buckets defined by `buckets` (that contains integers in range `1:nbuckets`). - -The buckets are sliced out in dimension specified by `bucketdim`. -""" -function mapbuckets( - fn, - a::Array, - nbuckets::Int, - buckets::Vector{Int}; - bucketdim::Int = 1, - slicedims = bucketdim, -) - ndims = length(size(a)) - - # precompute the range of the array - extent = [i == bucketdim ? (0 .== buckets) : (1:size(a, i)) for i = 1:ndims] - - # return a list of reduced dataset for each bucket - return [ - begin - # replace the bucketing dimension in the extent by the filter for current bucket - extent[bucketdim] = bucket .== buckets - # reduce the array and run the operation - mapslices(x -> fn(bucket, x), a[extent...], dims = slicedims) - end for bucket = 1:nbuckets - ] -end - -""" - catmapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1) - -Same as `mapbuckets`, except concatenates the bucketing results in the -bucketing dimension, thus creating a slightly neater matrix. `slicedims` is -therefore fixed to `bucketdim`. -""" -function catmapbuckets( - fn, - a::Array, - nbuckets::Int, - buckets::Vector{Int}; - bucketdim::Int = 1, -) - cat( - mapbuckets( - fn, - a, - nbuckets, - buckets, - bucketdim = bucketdim, - slicedims = bucketdim, - )..., - dims = bucketdim, - ) -end - - -""" - collect_extrema(ex1, ex2) - -Helper for collecting the minimums and maximums of the data. `ex1`, `ex2` are -arrays of pairs (min,max), this function combines the arrays element-wise and -finds combined minima and maxima. -""" -function collect_extrema(ex1, ex2) - broadcast(((a, b), (c, d)) -> (min(a, c), max(b, d)), ex1, ex2) -end - -""" - update_extrema(counts, target, lim, mid) - -Helper for distributed median computation -- returns updated extrema in `lims` -depending on whether the item count in `counts` of values less than `mids` is -less or higher than `targets`. -""" -function update_extrema(counts, targets, lims, mids) - broadcast( - (cnt, target, lim, mid) -> cnt >= target ? # if the count is too high, - (lim[1], mid) : # median is going to be in the lower half - (mid, lim[2]), # otherwise in the higher half - counts, - targets, - lims, - mids, - ) -end - -""" - dmedian(dInfo::LoadedDataInfo, columns::Vector{Int}) - -Compute a median in a distributed fashion, avoiding data transfer and memory -capacity that is required to compute the median in the classical way by -sorting. All data must be finite and defined. If the median is just between 2 -values, the lower one is chosen. - -The algorithm is approximative, searching for a good median by halving interval -and counting how many values are below the threshold. `iters` can be increased -to improve precision, each value adds roughly 1 bit to the precision. The -default value is 20, which corresponds to precision 10e-6 times the data range. -""" -function dmedian(dInfo::LoadedDataInfo, columns::Vector{Int}; iters = 20) - # how many items in the dataset should be smaller than the median (roughly size/2) - target = distributed_mapreduce(dInfo, d -> size(d, 1), +) ./ 2 - - # current estimation range for the median (tuples of min, max) - lims = distributed_mapreduce( - dInfo, - d -> mapslices(extrema, d[:, columns], dims = 1), - collect_extrema, - ) - - # convert the limits to a simple vector - lims = cat(lims..., dims = 1) - - for iter = 1:iters - mids = sum.(lims) ./ 2 - - count_smaller_than_mids = - d -> [count(x -> x < mids[i], d[:, c]) for (i, c) in enumerate(columns)] - - # compute the total number of elements smaller than `mids` - counts = distributed_mapreduce(dInfo, count_smaller_than_mids, +) - - # update lims into lower/upper half depending on whether the count was - # lower or higher than target - lims = update_extrema(counts, target, lims, mids) - end - - return sum.(lims) ./ 2 -end - -""" - dmedian_buckets(dInfo::LoadedDataInfo, nbuckets::Int, buckets::LoadedDataInfo, columns::Vector{Int}; iters=20) - -A version of `dmedian` that works with the bucketing information (i.e. -clusters) from `nbuckets` and `buckets`. -""" -function dmedian_buckets( - dInfo::LoadedDataInfo, - nbuckets::Int, - buckets::LoadedDataInfo, - columns::Vector{Int}; - iters = 20, -) - # count things in the buckets (produces a matrix with one row per bucket, - # one column for `columns`) - targets = - distributed_mapreduce( - [dInfo, buckets], - (d, b) -> catmapbuckets((_, x) -> size(x, 1), d[:, columns], nbuckets, b), - +, - ) ./ 2 - - # measure the minima and maxima of the datasets. In case the bucket is not - # present in the data partition, `extrema()` fails, so we replace it with - # `(Inf, -Inf)` which will eventually get coalesced with other numbers to - # normal values. - get_bucket_extrema = - (d, b) -> catmapbuckets( - (_, x) -> length(x) > 0 ? # if there are some elements - extrema(x) : # just take the extrema - (Inf, -Inf), # if not, use backup values - d[:, columns], - nbuckets, - b, - ) - - # collect the extrema - lims = distributed_mapreduce([dInfo, buckets], get_bucket_extrema, collect_extrema) - - for iter = 1:iters - mids = sum.(lims) ./ 2 - - # this counts the elements smaller than mids in buckets - # (both mids and elements are bucketed and column-sliced into matrices) - bucketed_count_smaller_than_mids = - (d, b) -> vcat(mapbuckets( - (bucketID, d) -> - [ - count(x -> x < mids[bucketID, colID], d[:, colID]) - for (colID, c) in enumerate(columns) - ]', - d, - nbuckets, - b, - slicedims = (1, 2), - )...) - - # collect the counts - counts = - distributed_mapreduce([dInfo, buckets], bucketed_count_smaller_than_mids, +) - - lims = update_extrema(counts, targets, lims, mids) - end - - return sum.(lims) ./ 2 -end diff --git a/src/base/distributed.jl b/src/base/distributed.jl deleted file mode 100644 index 50ca945d..00000000 --- a/src/base/distributed.jl +++ /dev/null @@ -1,355 +0,0 @@ -""" - save_at(worker, sym, val) - -Saves value `val` to symbol `sym` at `worker`. `sym` should be quoted (or -contain a symbol). `val` gets unquoted in the processing and evaluated at the -worker, quote it if you want to pass exact command to the worker. - -This is loosely based on the package ParallelDataTransfers, but made slightly -more flexible by omitting/delaying the explicit fetches etc. In particular, -`save_at` is roughly the same as `ParallelDataTransfers.sendto`, and -`get_val_from` works very much like `ParallelDataTransfers.getfrom`. - -# Return value - -A future with Nothing that can be fetched to see that the operation has -finished. - -# Examples - - addprocs(1) - save_at(2,:x,123) # saves 123 - save_at(2,:x,myid()) # saves 1 - save_at(2,:x,:(myid())) # saves 2 - save_at(2,:x,:(:x)) # saves the symbol :x - # (just :x won't work because of unquoting) - -# Note: Symbol scope - -The symbols are saved in Main module on the corresponding worker. For example, -`save_at(1, :x, nothing)` _will_ erase your local `x` variable. Beware of name -collisions. -""" -function save_at(worker, sym::Symbol, val) - remotecall(() -> Base.eval(Main, :( - begin - $sym = $val - nothing - end - )), worker) -end - -""" - get_from(worker,val) - -Get a value `val` from a remote `worker`; quoting of `val` works just as with -`save_at`. Returns a future with the requested value. -""" -function get_from(worker, val) - remotecall(() -> Base.eval(Main, :($val)), worker) -end - -""" - get_val_from(worker,val) - -Shortcut for instantly fetching the future from `get_from`. -""" -function get_val_from(worker, val) - fetch(get_from(worker, val)) -end - -""" - remove_from(worker,sym) - -Sets symbol `sym` on `worker` to `nothing`, effectively freeing the data. -""" -function remove_from(worker, sym::Symbol) - save_at(worker, sym, nothing) -end - -""" - distribute_array(sym, x::Array, pids; dim=1)::LoadedDataInfo - -Distribute roughly equal parts of array `x` separated on dimension `dim` among -`pids` into a worker-local variable `sym`. - -Returns the `LoadedDataInfo` structure for the distributed data. -""" -function distribute_array(sym::Symbol, x::Array, pids; dim = 1)::LoadedDataInfo - n = length(pids) - dims = size(x) - - for f in [ - begin - extent = [(1:s) for s in dims] - extent[dim] = (1+div((wid - 1) * dims[dim], n)):div(wid * dims[dim], n) - save_at(pid, sym, x[extent...]) - end for (wid, pid) in enumerate(pids) - ] - fetch(f) - end - - return LoadedDataInfo(sym, pids) -end - -""" - distribute_darray(sym, dd::DArray)::LoadedDataInfo - -Distribute the distributed array parts from `dd` into worker-local variable -`sym`. - -Returns the `LoadedDataInfo` structure for the distributed data. -""" -function distribute_darray(sym::Symbol, dd::DArray)::LoadedDataInfo - for f in [save_at(pid, sym, :($localpart($dd))) for pid in dd.pids] - fetch(f) - end - return LoadedDataInfo(sym, dd.pids) -end - -""" - undistribute(sym, workers) - -Remove the loaded data from workers. -""" -function undistribute(sym::Symbol, workers) - for f in [remove_from(pid, sym) for pid in workers] - fetch(f) - end -end - -""" - undistribute(dInfo::LoadedDataInfo) - -Remove the loaded data described by `dInfo` from the corresponding workers. -""" -function undistribute(dInfo::LoadedDataInfo) - undistribute(dInfo.val, dInfo.workers) -end - -""" - distributed_exec(val, fn, workers) - -Execute a function on workers, taking `val` as a parameter. Results are not -collected. This is optimal for various side-effect-causing computations that -are not expressible with `distributed_transform`. -""" -function distributed_exec(val, fn, workers) - for f in [get_from(pid, :( - begin - $fn($val) - nothing - end - )) for pid in workers] - fetch(f) - end -end - -""" - distributed_exec(dInfo::LoadedDataInfo, fn) - -Variant of `distributed_exec` that works with `LoadedDataInfo`. -""" -function distributed_exec(dInfo::LoadedDataInfo, fn) - distributed_exec(dInfo.val, fn, dInfo.workers) -end - -""" - distributed_transform(val, fn, workers, tgt::Symbol=val) - -Transform the worker-local distributed data available as `val` on `workers` -in-place, by a function `fn`. Store the result as `tgt` (default `val`) - -# Example - - # multiply all saved data by 2 - distributed_transform(:myData, (d)->(2*d), workers()) -""" -function distributed_transform(val, fn, workers, tgt::Symbol = val)::LoadedDataInfo - for f in [save_at(pid, tgt, :($fn($val))) for pid in workers] - fetch(f) - end - return LoadedDataInfo(tgt, workers) -end - -""" - distributed_transform(dInfo::LoadedDataInfo, fn, tgt::Symbol=dInfo.val)::LoadedDataInfo - -Same as `distributed_transform`, but specialized for `LoadedDataInfo`. -""" -function distributed_transform( - dInfo::LoadedDataInfo, - fn, - tgt::Symbol = dInfo.val, -)::LoadedDataInfo - distributed_transform(dInfo.val, fn, dInfo.workers, tgt) -end - -""" - distributed_mapreduce(val, map, fold, workers) - -Run `map`s (non-modifying transforms on the data) and `fold`s (2-to-1 -reductions) on the worker-local data (in `val`s) distributed on `workers` and -return the final reduced result. - -It is assumed that the fold operation is associative, but not commutative (as -in semigroups). If there are no workers, operation returns `nothing` (we don't -have a monoid to magically conjure zero elements :[ ). - -In current version, the reduce step is a sequential left fold, executed in the -main process. - -# Example - # compute the mean of all distributed data - sum,len = distributed_mapreduce(:myData, - (d) -> (sum(d),length(d)), - ((s1, l1), (s2, l2)) -> (s1+s2, l1+l2), - workers()) - println(sum/len) - -# Processing multiple arguments (a.k.a. "zipWith") - -The `val` here does not necessarily need to refer to a symbol, you can easily -pass in a quoted tuple, which will be unquoted in the function parameter. For -example, distributed values `:a` and `:b` can be joined as such: - - distributed_mapreduce(:((a,b)), - ((a,b)::Tuple) -> [a b], - vcat, - workers()) -""" -function distributed_mapreduce(val, map, fold, workers) - if isempty(workers) - return nothing - end - - futures = [get_from(pid, :($map($val))) for pid in workers] - res = fetch(futures[1]) - - # replace the collected futures with new empty futures to allow them to be - # GC'd and free memory for more incoming results - futures[1] = Future() - - for i = 2:length(futures) - res = fold(res, fetch(futures[i])) - futures[i] = Future() - end - res -end - -""" - distributed_mapreduce(dInfo::LoadedDataInfo, map, fold) - -Distributed map/reduce (just as the other overload of `distributed_mapreduce`) -that works with `LoadedDataInfo`. -""" -function distributed_mapreduce(dInfo::LoadedDataInfo, map, fold) - distributed_mapreduce(dInfo.val, map, fold, dInfo.workers) -end - -""" - distributed_mapreduce(vals::Vector, map, fold, workers) - -Variant of `distributed_mapreduce` that works with more distributed variables -at once. -""" -function distributed_mapreduce(vals::Vector, map, fold, workers) - return distributed_mapreduce(Expr(:vect, vals...), vals -> map(vals...), fold, workers) -end - -""" - distributed_mapreduce(dInfo1::LoadedDataInfo, dInfo2::LoadedDataInfo, map, fold) - -Variant of `distributed_mapreduce` that works with more `LoadedDataInfo`s at -once. The data must be distributed on the same set of workers, in the same -order. -""" -function distributed_mapreduce(dInfos::Vector{LoadedDataInfo}, map, fold) - if (isempty(dInfos)) - return nothing - end - - if any([dInfos[1].workers] .!= [di.workers for di in dInfos]) - @error "workers in LoadedDataInfo objects do not match" dInfos[1].workers - error("data distribution mismatch") - end - - return distributed_mapreduce([di.val for di in dInfos], map, fold, dInfos[1].workers) -end - -""" - distributed_collect(val::Symbol, workers, dim=1; free=false) - -Collect the arrays distributed on `workers` under value `val` into an array. The -individual arrays are pasted in the dimension specified by `dim`, i.e. `dim=1` -is roughly equivalent to using `vcat`, and `dim=2` to `hcat`. - -`val` must be an Array-based type; the function will otherwise fail. - -If `free` is true, the `val` is undistributed after collection. - -This preallocates the array for results, and is thus more efficient than e.g. -using `distributed_mapreduce` with `vcat` for folding. -""" -function distributed_collect(val::Symbol, workers, dim = 1; free = false) - size0 = get_val_from(workers[1], :(size($val))) - innerType = get_val_from(workers[1], :(typeof($val).parameters[1])) - sizes = distributed_mapreduce(val, d -> size(d, dim), vcat, workers) - ressize = [size0[i] for i = 1:length(size0)] - ressize[dim] = sum(sizes) - result = zeros(innerType, ressize...) - off = 0 - for (i, pid) in enumerate(workers) - idx = [(1:ressize[j]) for j = 1:length(ressize)] - idx[dim] = ((off+1):(off+sizes[i])) - result[idx...] = get_val_from(pid, val) - off += sizes[i] - end - if free - undistribute(val, workers) - end - return result -end - -""" - distributed_collect(dInfo::LoadedDataInfo, dim=1; free=false) - -Distributed collect (just as the other overload) that works with -`LoadedDataInfo`. -""" -function distributed_collect(dInfo::LoadedDataInfo, dim = 1; free = false) - return distributed_collect(dInfo.val, dInfo.workers, dim, free = free) -end - -""" - distributed_foreach(arr::Vector, fn, workers) - -Call a function `fn` on `workers`, with a single parameter arriving from the -corresponding position in `arr`. -""" -function distributed_foreach(arr::Vector, fn, workers) - futures = [ - remotecall(() -> Base.eval(Main, :($fn($(arr[i])))), pid) - for (i, pid) in enumerate(workers) - ] - return [fetch(f) for f in futures] -end - -""" - tmpSym(s::Symbol; prefix="", suffix="_tmp") - -Decorate a symbol `s` with prefix and suffix, to create a good name for a -related temporary value. -""" -function tmpSym(s::Symbol; prefix = "", suffix = "_tmp") - return Symbol(prefix * String(s) * suffix) -end - -""" - tmpSym(dInfo::LoadedDataInfo; prefix="", suffix="_tmp") - -Decorate the symbol from `dInfo` with prefix and suffix. -""" -function tmpSym(dInfo::LoadedDataInfo; prefix = "", suffix = "_tmp") - return tmpSym(dInfo.val, prefix = prefix, suffix = suffix) -end diff --git a/src/base/structs.jl b/src/base/structs.jl index c735ff5c..b9689698 100644 --- a/src/base/structs.jl +++ b/src/base/structs.jl @@ -35,20 +35,3 @@ Base.copy(som::Som) = Som( numCodes = som.numCodes, grid = som.grid, ) - -""" - LoadedDataInfo - -The basic structure for working with loaded data, distributed amongst workers. In completeness, it represents a dataset as such: - -- `val` is the "value name" under which the data are saved in processes. E.g. - `val=:foo` means that there is a variable `foo` on each process holding a - part of the matrix. -- `workers` is a list of workers (in correct order!) that hold the data - (similar to `DArray.pids`) -""" -struct LoadedDataInfo - val::Symbol - workers::Array{Int64} - LoadedDataInfo(val, workers) = new(val, workers) -end diff --git a/src/io/dio.jl b/src/io/dio.jl deleted file mode 100644 index c965c24f..00000000 --- a/src/io/dio.jl +++ /dev/null @@ -1,94 +0,0 @@ -""" - defaultFiles(s, pids) - -Make a good set of filenames for saving a dataset. -""" -function defaultFiles(s, pids) - return [String(s) * "-$i.slice" for i in eachindex(pids)] -end - -""" - distributed_export(sym::Symbol, pids, files=defaultFiles(sym,pids)) - -Export the content of symbol `sym` by each worker specified by `pids` to a -corresponding filename in `files`. -""" -function distributed_export(sym::Symbol, pids, files = defaultFiles(sym, pids)) - distributed_foreach( - files, - (fn) -> Base.eval(Main, :( - begin - open(f -> $serialize(f, $sym), $fn, "w") - nothing - end - )), - pids, - ) - nothing -end - -""" - distributed_export(dInfo::LoadedDataInfo, files=defaultFiles(dInfo.val, dInfo.workers)) - -Overloaded functionality for `LoadedDataInfo`. -""" -function distributed_export( - dInfo::LoadedDataInfo, - files = defaultFiles(dInfo.val, dInfo.workers), -) - distributed_export(dInfo.val, dInfo.workers, files) -end - -""" - distributed_import(sym::Symbol, pids, files=defaultFiles(sym,pids)) - -Import the content of symbol `sym` by each worker specified by `pids` from the -corresponding filename in `files`. -""" -function distributed_import(sym::Symbol, pids, files = defaultFiles(sym, pids)) - distributed_foreach( - files, - (fn) -> Base.eval(Main, :( - begin - $sym = open($deserialize, $fn) - nothing - end - )), - pids, - ) - return LoadedDataInfo(sym, pids) -end - -""" - distributed_import(dInfo::LoadedDataInfo, files=defaultFiles(dInfo.val, dInfo.workers)) - -Overloaded functionality for `LoadedDataInfo`. -""" -function distributed_import( - dInfo::LoadedDataInfo, - files = defaultFiles(dInfo.val, dInfo.workers), -) - distributed_import(dInfo.val, dInfo.workers, files) -end - -""" - distributed_unlink(sym::Symbol, pids, files=defaultFiles(sym,pids)) - -Remove the files created by `distributed_export` with the same parameters. -""" -function distributed_unlink(sym::Symbol, pids, files = defaultFiles(sym, pids)) - distributed_foreach(files, (fn) -> rm(fn), pids) - nothing -end - -""" - distributed_unlink(dInfo::LoadedDataInfo, files=defaultFiles(dInfo.val, dInfo.workers)) - -Overloaded functionality for `LoadedDataInfo`. -""" -function distributed_unlink( - dInfo::LoadedDataInfo, - files = defaultFiles(dInfo.val, dInfo.workers), -) - distributed_unlink(dInfo.val, dInfo.workers, files) -end diff --git a/src/io/input.jl b/src/io/input.jl index 35ee49bb..a198d396 100644 --- a/src/io/input.jl +++ b/src/io/input.jl @@ -96,7 +96,7 @@ function loadFCS( end """ - loadFCSSet(name::Symbol, fns::Vector{String}, pids=workers(); applyCompensation=true, postLoad=(d,i)->d)::LoadedDataInfo + loadFCSSet(name::Symbol, fns::Vector{String}, pids=workers(); applyCompensation=true, postLoad=(d,i)->d)::Dinfo This runs the FCS loading machinery in a distributed way, so that the files `fns` (with full path) are sliced into equal parts and saved as a distributed @@ -121,9 +121,9 @@ function loadFCSSet( pids = workers(); applyCompensation = true, postLoad = (d, i) -> d, -)::LoadedDataInfo +)::Dinfo slices = slicesof(loadFCSSizes(fns), length(pids)) - distributed_foreach( + dmap( slices, (slice) -> Base.eval( Main, @@ -142,7 +142,7 @@ function loadFCSSet( ), pids, ) - return LoadedDataInfo(name, pids) + return Dinfo(name, pids) end """ @@ -167,7 +167,7 @@ function selectFCSColumns(selectColnames::Vector{String}) end """ - distributeFCSFileVector(name::Symbol, fns::Vector{String}, pids=workers())::LoadedDataInfo + distributeFCSFileVector(name::Symbol, fns::Vector{String}, pids=workers())::Dinfo Distribute a vector of integers among the workers that describes which file from `fns` the cell comes from. Useful for producing per-file statistics. The @@ -178,14 +178,14 @@ function distributeFCSFileVector( name::Symbol, fns::Vector{String}, pids = workers(), -)::LoadedDataInfo +)::Dinfo sizes = loadFCSSizes(fns) slices = slicesof(sizes, length(pids)) return distributeFileVector(name, sizes, slices, pids) end """ - distributeFileVector(name::Symbol, sizes::Vector{Int}, slices::Vector{Tuple{Int,Int,Int,Int}}, pids=workers())::LoadedDataInfo + distributeFileVector(name::Symbol, sizes::Vector{Int}, slices::Vector{Tuple{Int,Int,Int,Int}}, pids=workers())::Dinfo Generalized version of `distributeFCSFileVector` that produces the integer vector from any `sizes` and `slices`. @@ -195,14 +195,14 @@ function distributeFileVector( sizes::Vector{Int}, slices::Vector{Tuple{Int,Int,Int,Int}}, pids = workers(), -)::LoadedDataInfo - distributed_foreach( +)::Dinfo + dmap( slices, (slice) -> Base.eval(Main, :($name = collectSlice((i) -> fill(i, $sizes[i]), $slice))), pids, ) - return LoadedDataInfo(name, pids) + return Dinfo(name, pids) end """ @@ -255,7 +255,7 @@ end pids = workers(); postLoad = (d, i) -> d, csvargs..., - )::LoadedDataInfo + )::Dinfo CSV equivalent of `loadFCSSet`. `csvargs` are passed as keyword arguments to CSV-loading functions. @@ -266,9 +266,9 @@ function loadCSVSet( pids = workers(); postLoad = (d, i) -> d, csvargs..., -)::LoadedDataInfo +)::Dinfo slices = slicesof(loadCSVSizes(fns; csvargs...), length(pids)) - distributed_foreach( + dmap( slices, (slice) -> Base.eval( Main, @@ -284,5 +284,5 @@ function loadCSVSet( ), pids, ) - return LoadedDataInfo(name, pids) + return Dinfo(name, pids) end diff --git a/test/runtests.jl b/test/runtests.jl index 7e747bba..8d8414d5 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,4 @@ -using GigaSOM, DataFrames, XLSX, CSV, Test, Random, Distributed +using GigaSOM, DataFrames, XLSX, CSV, Test, Random, Distributed, DistributedData using FileIO, DataFrames, Distances using JSON, SHA import LinearAlgebra @@ -18,7 +18,6 @@ end checkDir() @testset "GigaSOM test suite" begin - include("testDistributed.jl") include("testDataOps.jl") include("testTrainutils.jl") include("testSplitting.jl") diff --git a/test/testBatch.jl b/test/testBatch.jl index b9b2b308..2f8f3587 100644 --- a/test/testBatch.jl +++ b/test/testBatch.jl @@ -5,7 +5,7 @@ #check whether the distributed version works the same save_at(1, :test, pbmc8_data) - som2 = initGigaSOM(LoadedDataInfo(:test, [1]), 10, 10, seed = 1234) + som2 = initGigaSOM(Dinfo(:test, [1]), 10, 10, seed = 1234) @test som.codes == som2.codes remove_from(1, :test) diff --git a/test/testDataOps.jl b/test/testDataOps.jl index 50104531..ba73bf89 100644 --- a/test/testDataOps.jl +++ b/test/testDataOps.jl @@ -7,120 +7,22 @@ dd = rand(11111, 5) buckets = rand([1, 2, 3], 11111) - di1 = distribute_array(:test1, dd, W[1:1]) - di2 = distribute_array(:test2, dd, W) - buckets1 = distribute_array(:buckets1, buckets, W[1:1]) - buckets2 = distribute_array(:buckets2, buckets, W) + di1 = scatter_array(:test1, dd, W[1:1]) + di2 = scatter_array(:test2, dd, W) + buckets1 = scatter_array(:buckets1, buckets, W[1:1]) + buckets2 = scatter_array(:buckets2, buckets, W) - @testset "Distribution works as expected" begin - @test distributed_collect(di1) == dd - @test distributed_collect(di2) == dd - end - - subsel = [1, 3, 4] - dd = dd[:, subsel] - dselect(di1, subsel) - dselect(di2, subsel) - - @testset "dapply_cols, dapply_rows" begin - diV = distribute_array(:testV, dd, W[1:1]) - diH = distribute_array(:testH, Matrix(dd'), W[1:1]) - dapply_cols(diV, (x, _) -> x ./ sum(x), Vector(1:3)) - dapply_rows(diH, x -> x ./ sum(x)) - @test isapprox(distributed_collect(diV), Matrix(distributed_collect(diH)')) - undistribute(diV) - undistribute(diH) - end - - @testset "dselect" begin - @test distributed_collect(di1) == dd - @test distributed_collect(di2) == dd - end - - di3 = dcopy(di2, :test3) - - @testset "dcopy" begin - @test di2.workers == di3.workers - @test dd == distributed_collect(di3) - end - - undistribute(di3) - - (means1, sds1) = dstat(di1, [1, 3]) - (means2, sds2) = dstat(di2, [1, 3]) - - @testset "dstat" begin - @test length(means1) == 2 - @test length(sds1) == 2 - @test isapprox(means1, means2, atol = 1e-8) - @test isapprox(sds1, sds2, atol = 1e-8) - @test all([isapprox(0.5, m, atol = 0.1) for m in means1]) - @test all([isapprox(0.29, s, atol = 0.1) for s in sds1]) - end - - medians1 = dmedian(di1, [1, 3]) - medians2 = dmedian(di2, [1, 3]) - - @testset "dmedian" begin - @test length(medians1) == 2 - @test isapprox(medians1, medians2, atol = 1e-4) - @test all([isapprox(0.5, m, atol = 0.1) for m in medians1]) - end - - (bmeans1, bsds1) = dstat_buckets(di1, 3, buckets1, [1, 3]) - (bmeans2, bsds2) = dstat_buckets(di2, 3, buckets2, [1, 3]) - - @testset "bucketed dstat" begin - @test size(bmeans1) == (3, 2) - @test size(bsds1) == (3, 2) - @test isapprox(bmeans1, bmeans2, atol = 1e-8) - @test isapprox(bsds1, bsds2, atol = 1e-8) - @test all([isapprox(0.5, m, atol = 0.1) for m in bmeans1]) - @test all([isapprox(0.29, s, atol = 0.1) for s in bsds1]) - end - - medians1 = dmedian_buckets(di1, 3, buckets1, [1, 3]) - medians2 = dmedian_buckets(di2, 3, buckets2, [1, 3]) - - @testset "dmedian" begin - @test size(medians1) == (3, 2) - @test isapprox(medians1, medians2, atol = 1e-4) - @test all([isapprox(0.5, m, atol = 0.1) for m in medians1]) - end - - dscale(di1, [2, 3]) - dscale(di2, [2, 3]) - (means1, sds1) = dstat(di1, [1, 2, 3]) - (means2, sds2) = dstat(di2, [1, 2, 3]) - - @testset "dscale" begin - @test isapprox(means1, means2, atol = 1e-8) - @test isapprox(sds1, sds2, atol = 1e-8) - @test isapprox(0.5, means1[1], atol = 0.1) - @test all([isapprox(0.0, m, atol = 1e-3) for m in means1[2:3]]) - @test all([isapprox(1.0, s, atol = 1e-3) for s in sds1[2:3]]) - end - - dc = distributed_collect(di1) + dc = gather_array(di1) dc[:, 1:2] = asinh.(dc[:, 1:2] ./ 1.23) dtransform_asinh(di1, [1, 2], 1.23) dtransform_asinh(di2, [1, 2], 1.23) @testset "dtransform_asinh" begin - @test isapprox(distributed_collect(di1), dc) - @test isapprox(distributed_collect(di2), dc) + @test isapprox(gather_array(di1), dc) + @test isapprox(gather_array(di2), dc) end - undistribute(di1) - undistribute(di2) + unscatter(di1) + unscatter(di2) rmprocs(W) - - d = ones(Float64, 2, 2) - di = distribute_array(:test, d, [1]) - dscale(di, [1, 2]) - @testset "dscale does not produce NaNs on sdev==0" begin - @test distributed_collect(di) == zeros(Float64, 2, 2) - end - undistribute(di) - end diff --git a/test/testDistributed.jl b/test/testDistributed.jl deleted file mode 100644 index eab7123e..00000000 --- a/test/testDistributed.jl +++ /dev/null @@ -1,125 +0,0 @@ - -@testset "Distributed data handling" begin - - @testset "Distributed data transfers -- local" begin - data = rand(5) - save_at(1, :test, data) - - @test fetch(get_from(1, :test)) == data - @test get_val_from(1, :test) == data - - remove_from(1, :test) - - @test sizeof(get_val_from(1, :test)) == 0 #should be "nothing" but this is more generic - end - - addprocs(3) - @everywhere using GigaSOM - W = workers() - - @testset "Distributed data transfers -- with workers" begin - data = [rand(5) for i in W] - for (i, w) in enumerate(W) - save_at(w, :test, data[i]) - end - - @test [fetch(get_from(w, :test)) for w in W] == data - @test [get_val_from(w, :test) for w in W] == data - - undistribute(:test, W) - - @test sum([sizeof(get_val_from(w, :test)) for w in W]) == 0 - end - - @testset "Data distribution" begin - d = rand(100, 5) - @everywhere import DistributedArrays - dd = DistributedArrays.distribute(d, procs = W) - di = distribute_darray(:test, dd) - @test di.val == :test - @test Set(di.workers) == Set(W) - @test begin - d1 = get_val_from(di.workers[1], :test) - d1 == d[1:size(d1, 1), :] - end - - @test distributed_collect(di, free = true) == d - @test sum([sizeof(get_val_from(w, :test)) for w in W]) == 0 - - di = distribute_array(:test, d, W) - @test distributed_collect(di, free = false) == d - @test sum([sizeof(get_val_from(w, :test)) for w in W]) > 0 - undistribute(di) - @test sum([sizeof(get_val_from(w, :test)) for w in W]) == 0 - end - - @testset "Distributed computation" begin - di = distributed_transform(:(), x -> rand(5), W, :test) - - @test get_val_from(W[1], :test) == distributed_collect(di)[1:5] - - orig = distributed_collect(di) - - @test isapprox( - distributed_mapreduce(:test, d -> sum(d .^ 2), (a, b) -> a + b, W), - sum(orig .^ 2), - ) - - distributed_transform(di, d -> d .* 2) - - @test orig .* 2 == distributed_collect(:test, W) - - @test isapprox( - distributed_mapreduce(di, d -> sum(d .^ 2), (a, b) -> a + b), - sum((orig .* 2) .^ 2), - ) - - t = zeros(length(W)) - exp = zeros(length(W)) - - t[1] = 2 - exp[1] = sum(2 .* get_val_from(W[1], :test)) - - @test distributed_foreach(t, (i) -> eval(:(sum($i .* $(di.val)))), W) == exp - - undistribute(di) - - @test distributed_mapreduce(:noname, x -> x, (a, b) -> a + b, []) == nothing - end - - @testset "Distributed utilities" begin - @test GigaSOM.tmpSym(:test) != :test - @test GigaSOM.tmpSym(:test, prefix = "abc", suffix = "def") == :abctestdef - @test GigaSOM.tmpSym(LoadedDataInfo(:test, W)) != :test - end - - @testset "Persistent distributed data" begin - di = distributed_transform(:(), x -> rand(5), W, :test) - - files = GigaSOM.defaultFiles(di.val, di.workers) - @test allunique(files) - - orig = distributed_collect(di) - distributed_export(di, files) - distributed_transform(di, x -> "erased") - distributed_import(di, files) - - @test orig == distributed_collect(di) - - distributed_export(di.val, di.workers, files) - di2 = distributed_import(:test2, di.workers, files) - - @test orig == distributed_collect(di2) - - undistribute(di) - undistribute(di2) - - distributed_unlink(di) - - @test all([!isfile(f) for f in files]) - end - - rmprocs(W) - W = nothing - -end diff --git a/test/testFileSplitting.jl b/test/testFileSplitting.jl index 15984aa8..b2e089be 100644 --- a/test/testFileSplitting.jl +++ b/test/testFileSplitting.jl @@ -12,9 +12,9 @@ cols = Vector(1:length(antigens)) dtransform_asinh(di, cols, 5) dscale(di, cols) - @test isapprox(pbmc8_data, distributed_collect(di), atol = 1e-4) + @test isapprox(pbmc8_data, gather_array(di), atol = 1e-4) - splits = distributed_mapreduce(di, d -> size(d, 1), vcat) + splits = dmapreduce(di, d -> size(d, 1), vcat) @test sum(splits) == size(pbmc8_data, 1) @test minimum(splits) + 1 >= maximum(splits) diff --git a/test/testInputCSV.jl b/test/testInputCSV.jl index efe453cc..7477da63 100644 --- a/test/testInputCSV.jl +++ b/test/testInputCSV.jl @@ -17,8 +17,8 @@ W = addprocs(3) @everywhere using GigaSOM di = loadCSVSet(:csvTest, files, W, header = true) - @test distributed_collect(di) == vcat(data1, data2) - sizes = distributed_mapreduce(di, size, vcat) + @test gather_array(di) == vcat(data1, data2) + sizes = dmapreduce(di, size, vcat) dims = map(last, sizes) counts = map(first, sizes) @test all(dims .== 10) diff --git a/test/testLoadPBMC8.jl b/test/testLoadPBMC8.jl index 045d7f78..f74258a2 100644 --- a/test/testLoadPBMC8.jl +++ b/test/testLoadPBMC8.jl @@ -70,8 +70,8 @@ cols = Vector(1:length(antigens)) dtransform_asinh(di, cols, 5) dscale(di, cols) -pbmc8_data = distributed_collect(di) -undistribute(di) +pbmc8_data = gather_array(di) +unscatter(di) @testset "load-time columns normalization" begin di = loadFCSSet( @@ -82,6 +82,6 @@ undistribute(di) ) dtransform_asinh(di, cols, 5) dscale(di, cols) - @test pbmc8_data == distributed_collect(di) - undistribute(di) + @test pbmc8_data == gather_array(di) + unscatter(di) end From 56b63b881cbd5bfa3d57ce3da58a36adc58c8043 Mon Sep 17 00:00:00 2001 From: Mirek Kratochvil Date: Sat, 19 Mar 2022 14:20:50 +0100 Subject: [PATCH 3/6] drop visualization code (it was commented out anyway) --- src/GigaSOM.jl | 6 ---- src/visualization/Plotting.jl | 62 ----------------------------------- 2 files changed, 68 deletions(-) delete mode 100644 src/visualization/Plotting.jl diff --git a/src/GigaSOM.jl b/src/GigaSOM.jl index 7b5bd7a1..ad9466fe 100644 --- a/src/GigaSOM.jl +++ b/src/GigaSOM.jl @@ -31,9 +31,6 @@ include("io/input.jl") include("io/process.jl") include("io/splitting.jl") -# include visualization files -# include("visualization/plotting.jl") - #core export initGigaSOM, trainGigaSOM, mapToGigaSOM @@ -68,9 +65,6 @@ export slicesof, vcollectSlice, collectSlice #io/process export cleanNames!, getMetaData, getMarkerNames -# plotting -export plotCounts, plotPCA - #dataops (higher-level operations on data) export dtransform_asinh diff --git a/src/visualization/Plotting.jl b/src/visualization/Plotting.jl deleted file mode 100644 index 874252c1..00000000 --- a/src/visualization/Plotting.jl +++ /dev/null @@ -1,62 +0,0 @@ -""" - plotCounts(fcsRaw, md, group_by = "condition") - -Barplot showing the number of cells per sample, used as a guide to identify samples where not enough cells were assayed - -# Arguments: -- `fcsRaw`: raw FCS data -- `md`: Metadata table -""" - -function plotCounts(fcsRaw, md, group_by = "condition") - - df_barplot = DataFrame(filename = String[], size = Int[], condition = String[]) - - for (k, v) in fcsRaw - sid = md.sample_id[k.==md.file_name] - # println(sid[1]) - condition = md.condition[k.==md.file_name] - push!(df_barplot, (string(sid[1]), size(v)[1], condition[1])) - end - sort!(df_barplot) - bar( - df_barplot.filename, - df_barplot.size, - title = "Numer of Cells", - group = df_barplot.condition, - xrotation = 60, - ) -end - - -""" - plotPCA(daFrame) - -Plotting the PCA of all median marker expression - -# Arguments: -- `daFrame`: daFrame containing the fcs data, metadata and panel -""" - -function plotPCA(daf, md) - dfall_median = aggregate(daf.fcstable, :sample_id, Statistics.median) - - T = convert(Matrix, dfall_median) - samples_ids = T[:, 1] - T_reshaped = permutedims(convert(Matrix{Float64}, T[:, 2:10]), [2, 1]) - - my_pca = StatsBase.fit(MultivariateStats.PCA, T_reshaped) - - yte = MultivariateStats.transform(my_pca, T_reshaped) - - df_pca = DataFrame(yte') - df_pca[:sample_id] = samples_ids - - # get the condition per sample id and add in DF - v1 = df_pca.sample_id - v2 = md.sample_id - idxs = indexin(v1, v2) - df_pca[:condition] = md.condition[idxs] - - StatsPlots.@df df_pca scatter(:x1, :x2, group = :condition) -end From 55321d7430efaaa0d9fc2a9c6ee713765ea98c90 Mon Sep 17 00:00:00 2001 From: Mirek Kratochvil Date: Sat, 19 Mar 2022 14:22:59 +0100 Subject: [PATCH 4/6] clean up docs --- docs/src/functions.md | 7 ------- docs/src/tutorials/processingFCSData.md | 6 +++--- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/docs/src/functions.md b/docs/src/functions.md index efd6dcad..95e64b05 100644 --- a/docs/src/functions.md +++ b/docs/src/functions.md @@ -27,10 +27,3 @@ Pages = ["core.jl", "trainutils.jl"] Modules = [GigaSOM] Pages = ["embedding.jl"] ``` - -## Distributed processing tools - -```@autodocs -Modules = [GigaSOM] -Pages = ["distributed.jl", "dio.jl"] -``` diff --git a/docs/src/tutorials/processingFCSData.md b/docs/src/tutorials/processingFCSData.md index 0f96f4bd..7c891fc7 100644 --- a/docs/src/tutorials/processingFCSData.md +++ b/docs/src/tutorials/processingFCSData.md @@ -108,7 +108,7 @@ If you are sure you have enough RAM, you can collect the data to the master node. (In case of the relatively small Levine13 dataset, you very probably have the required 2.5MB of RAM, but there are many larger datasets.) ```julia -e = distributed_collect(e) +e = gather_array(e) ``` ``` @@ -260,13 +260,13 @@ The `metaClusters` represent membership of the SOM codes in cluster; these can be expanded to membership of all cells using [`mapToGigaSOM`](@ref): ```julia -mapping = distributed_collect(mapToGigaSOM(som, di), free=true) +mapping = gather_array(mapToGigaSOM(som, di), free=true) clusters = metaClusters[mapping] ``` `clusters` now contain an integer from `1` to `10` with a classification of each cell in the dataset. -(The argument `free=true` of `distributed_collect` automatically removes the +(The argument `free=true` of `gather_array` automatically removes the distributed data from workers after collecting, which saves their memory for other datasets.) From 6431b9df08fe2e3470ca2bf17fe97a0234387d11 Mon Sep 17 00:00:00 2001 From: Mirek Kratochvil Date: Sat, 19 Mar 2022 14:26:58 +0100 Subject: [PATCH 5/6] improve compat with DiDa --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 2d791240..c030bab8 100644 --- a/Project.toml +++ b/Project.toml @@ -22,7 +22,7 @@ CSV = "^0.5.12, 0.6, 0.7, 0.8, 0.10" DataFrames = "0.20, 0.21, 0.22, 1.0" Distances = "^0.8.2, 0.9, 0.10" DistributedArrays = "^0.6.4" -DistributedData = "0.2" +DistributedData = "0.1.4, 0.2" Distributions = "^0.21.1, 0.22, 0.23, 0.24, 0.25" FCSFiles = "^0.1.1" FileIO = "^1.0.7" From 567ceff7cc1f1b8828ee5f212688833181737312 Mon Sep 17 00:00:00 2001 From: Mirek Kratochvil Date: Sat, 19 Mar 2022 14:33:06 +0100 Subject: [PATCH 6/6] complimentary juliaformatter --- src/analysis/core.jl | 6 +----- src/base/dataops.jl | 1 - src/base/trainutils.jl | 4 +++- src/io/input.jl | 16 +++++++--------- src/io/splitting.jl | 36 ++++++++++++++++++++---------------- 5 files changed, 31 insertions(+), 32 deletions(-) diff --git a/src/analysis/core.jl b/src/analysis/core.jl index 13c06b41..83f40cda 100644 --- a/src/analysis/core.jl +++ b/src/analysis/core.jl @@ -256,11 +256,7 @@ function mapToGigaSOM( tree = knnTreeFun(Array{Float64,2}(transpose(som.codes)), metric) - return dtransform( - dInfo, - (d) -> (vcat(knn(tree, transpose(d), 1)[1]...)), - output, - ) + return dtransform(dInfo, (d) -> (vcat(knn(tree, transpose(d), 1)[1]...)), output) end """ diff --git a/src/base/dataops.jl b/src/base/dataops.jl index 2919eb71..e9d8fee3 100644 --- a/src/base/dataops.jl +++ b/src/base/dataops.jl @@ -6,4 +6,3 @@ Transform columns of the dataset by asinh transformation with `cofactor`. function dtransform_asinh(dInfo::Dinfo, columns::Vector{Int}, cofactor = 5) dapply_cols(dInfo, (v, _) -> asinh.(v ./ cofactor), columns) end - diff --git a/src/base/trainutils.jl b/src/base/trainutils.jl index 4de316df..589d7ecf 100644 --- a/src/base/trainutils.jl +++ b/src/base/trainutils.jl @@ -50,7 +50,9 @@ function expRadius(steepness::Float64 = 0.0) adjust = finalRadius * (1 - 1.1^(-steepness)) if initRadius <= 0 || (initRadius - adjust) <= 0 || finalRadius <= 0 - error("Radii must be positive. (Possible alternative cause: steepness is too high.)") + error( + "Radii must be positive. (Possible alternative cause: steepness is too high.)", + ) end initRadius -= adjust diff --git a/src/io/input.jl b/src/io/input.jl index a198d396..e5d4be32 100644 --- a/src/io/input.jl +++ b/src/io/input.jl @@ -130,10 +130,12 @@ function loadFCSSet( :( begin $name = vcollectSlice( - (i) -> last($postLoad( - loadFCS($fns[i]; applyCompensation = $applyCompensation), - i, - )), + (i) -> last( + $postLoad( + loadFCS($fns[i]; applyCompensation = $applyCompensation), + i, + ), + ), $slice, ) nothing @@ -174,11 +176,7 @@ from `fns` the cell comes from. Useful for producing per-file statistics. The vector is saved on workers specified by `pids` as a distributed variable `name`. """ -function distributeFCSFileVector( - name::Symbol, - fns::Vector{String}, - pids = workers(), -)::Dinfo +function distributeFCSFileVector(name::Symbol, fns::Vector{String}, pids = workers())::Dinfo sizes = loadFCSSizes(fns) slices = slicesof(sizes, length(pids)) return distributeFileVector(name, sizes, slices, pids) diff --git a/src/io/splitting.jl b/src/io/splitting.jl index 62c1acdf..98dbdc22 100644 --- a/src/io/splitting.jl +++ b/src/io/splitting.jl @@ -79,14 +79,16 @@ function vcollectSlice( loadMtx, (startFile, startOff, finalFile, finalOff)::Tuple{Int,Int,Int,Int}, )::Matrix - vcat([ - begin - m = loadMtx(i) - beginIdx = i == startFile ? startOff : 1 - endIdx = i == finalFile ? finalOff : size(m, 1) - m[beginIdx:endIdx, :] - end for i = startFile:finalFile - ]...) + vcat( + [ + begin + m = loadMtx(i) + beginIdx = i == startFile ? startOff : 1 + endIdx = i == finalFile ? finalOff : size(m, 1) + m[beginIdx:endIdx, :] + end for i = startFile:finalFile + ]..., + ) end """ @@ -98,12 +100,14 @@ function collectSlice( loadVec, (startFile, startOff, finalFile, finalOff)::Tuple{Int,Int,Int,Int}, )::Vector - vcat([ - begin - v = loadVec(i) - beginIdx = i == startFile ? startOff : 1 - endIdx = i == finalFile ? finalOff : length(v) - v[beginIdx:endIdx] - end for i = startFile:finalFile - ]...) + vcat( + [ + begin + v = loadVec(i) + beginIdx = i == startFile ? startOff : 1 + endIdx = i == finalFile ? finalOff : length(v) + v[beginIdx:endIdx] + end for i = startFile:finalFile + ]..., + ) end