Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving the assembly #70

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.1.4"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Folds = "41a02a25-b8f0-4f67-bc48-60067656b558"
IterativeSolvers = "42fd0dbc-a981-5370-80f2-aaf504508153"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
LowRankApprox = "898213cb-b102-5a47-900c-97e73b919f73"
Expand All @@ -13,6 +14,8 @@ ParallelDataTransfer = "2dcacdae-9679-587a-88bb-8b444fb7085b"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383"
StaticArrays = "90137ffa-7385-5640-81b9-e52037218182"
ThreadedIterables = "11d239b0-c0b9-11e8-1935-d5cfa53abb03"
ThreadsX = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d"

[weakdeps]
MLJ = "add582a8-e3ab-11e8-2d5e-e98b27df1bc7"
Expand All @@ -26,7 +29,6 @@ ACEfit_MLJLinearModels_ext = [ "MLJ", "MLJLinearModels" ]
ACEfit_MLJScikitLearnInterface_ext = ["MLJScikitLearnInterface", "PythonCall", "MLJ"]

[compat]
julia = "1.9"
IterativeSolvers = "0.9.2"
MLJ = "0.19"
MLJLinearModels = "0.9"
Expand All @@ -37,9 +39,10 @@ ParallelDataTransfer = "0.5.0"
ProgressMeter = "1.7"
PythonCall = "0.9"
StaticArrays = "1.5"
julia = "1.9"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", ]
test = ["Test"]
183 changes: 182 additions & 1 deletion src/assemble.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using Distributed
using Folds
using ParallelDataTransfer
using ProgressMeter
using SharedArrays
using Base.Threads: nthreads, @threads
using ThreadedIterables
using ThreadsX

struct DataPacket{T <: AbstractData}
rows::UnitRange
Expand All @@ -13,7 +17,7 @@ Base.length(d::DataPacket) = count_observations(d.data)
"""
Assemble feature matrix and target vector for given data and basis.
"""
function assemble(data::AbstractVector{<:AbstractData}, basis)
function assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true)
@info "Assembling linear problem."
rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data
rows[1] = 1:count_observations(data[1])
Expand All @@ -30,6 +34,43 @@ function assemble(data::AbstractVector{<:AbstractData}, basis)
@showprogress pmap(packets) do p
A[p.rows, :] .= feature_matrix(p.data, basis)
Y[p.rows] .= target_vector(p.data)
do_gc && GC.gc()
end
@info " - Assembly completed."
return Array(A), Array(Y), assemble_weights(data)
end

"""
Assemble feature matrix and target vector for given data and basis.
"""
function assemble_mixed(data::AbstractVector{<:AbstractData}, basis; mode=:threaded)
@info "Assembling linear problem."
rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data
rows[1] = 1:count_observations(data[1])
for i in 2:length(data)
rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i]))
end
packets = DataPacket.(rows, data)
sort!(packets, by = length, rev = true)
@info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))."
A = SharedArray(zeros(rows[end][end], length(basis)))
Y = SharedArray(zeros(size(A, 1)))
W = SharedArray(zeros(size(A, 1)))
if mode == :serial
@info " - Beginning serial assembly."
elseif mode == :threaded
@info " - Beginning threaded assembly with $(Threads.nthreads()) threads."
map = Folds.map
elseif mode == :distributed
@info " - Beginning distributed assembly with $(nprocs()) processes."
map = pmap
(nprocs() > 1) && sendto(workers(), basis = basis)
end
progress = Progress(length(data))
map(packets) do p
A[p.rows,:] .= feature_matrix(p.data, basis)
Y[p.rows] .= target_vector(p.data)
next!(progress)
GC.gc()
end
@info " - Assembly completed."
Expand All @@ -51,6 +92,146 @@ function assemble_weights(data::AbstractVector{<:AbstractData})
W = SharedArray(zeros(rows[end][end]))
@showprogress pmap(packets) do p
W[p.rows] .= weight_vector(p.data)
GC.gc()
end
return Array(W)
end


"""
Assemble feature matrix, target vector, and weight vector for given data and basis.
"""
function mt_assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true)
@info "Multi-threaded assembly of linear problem."
rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data
rows[1] = 1:count_observations(data[1])
for i in 2:length(data)
rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i]))
end
packets = DataPacket.(rows, data)
sort!(packets, by = length, rev = true)
@info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))."
A = zeros(rows[end][end], length(basis))
Y = zeros(size(A, 1))
W = zeros(size(A, 1))
@info " - Beginning assembly with $(Threads.nthreads()) threads."
_lock = ReentrantLock()
_prog = Progress(sum(length, rows))
_prog_ctr = 0
next = 1

failed = Int[]

Threads.@threads for _i = 1:nthreads()

while next <= length(packets)
# retrieve the next packet
if next > length(packets)
break
end
lock(_lock)
cur = next
next += 1
unlock(_lock)
if cur > length(packets)
break
end
p = packets[cur]

# assemble the corresponding data
try
Ap = feature_matrix(p.data, basis)
Yp = target_vector(p.data)
Wp = weight_vector(p.data)

# write into global design matrix
lock(_lock)
A[p.rows, :] .= Ap
Y[p.rows] .= Yp
W[p.rows] .= Wp
_prog_ctr += length(p.rows)
ProgressMeter.update!(_prog, _prog_ctr)
unlock(_lock)
do_gc && GC.gc()
catch
@info("failed assembly: cur = $cur")
push!(failed, cur)
end
end
@info("thread $_i done")
end
@info " - Assembly completed."
@show failed
return Array(A), Array(Y), Array(W)
end

function assemble_threadediterables(data::AbstractVector{<:AbstractData}, basis)
@info "Assembling linear problem."
rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data
rows[1] = 1:count_observations(data[1])
for i in 2:length(data)
rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i]))
end
packets = DataPacket.(rows, data)
sort!(packets, by = length, rev = true)
@info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))."
A = Array(zeros(rows[end][end], length(basis)))
Y = Array(zeros(size(A, 1)))
W = Array(zeros(size(A, 1)))
@info " - Beginning assembly with thread count: $(Threads.nthreads())."
@time tmap(packets) do p
A[p.rows, :] .= feature_matrix(p.data, basis)
Y[p.rows] .= target_vector(p.data)
W[p.rows] .= weight_vector(p.data)
GC.gc()
end
@info " - Assembly completed."
return A, Y, W
end

function assemble_threadsx(data::AbstractVector{<:AbstractData}, basis)
@info "Assembling linear problem."
rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data
rows[1] = 1:count_observations(data[1])
for i in 2:length(data)
rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i]))
end
packets = DataPacket.(rows, data)
sort!(packets, by = length, rev = true)
@info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))."
A = Array(zeros(rows[end][end], length(basis)))
Y = Array(zeros(size(A, 1)))
W = Array(zeros(size(A, 1)))
@info " - Beginning assembly with thread count: $(Threads.nthreads())."
@time ThreadsX.map(packets) do p
A[p.rows, :] .= feature_matrix(p.data, basis)
Y[p.rows] .= target_vector(p.data)
W[p.rows] .= weight_vector(p.data)
GC.gc()
end
@info " - Assembly completed."
return A, Y, W
end


function assemble_new(data::AbstractArray, basis; batch_size=1, kwargs...)
W = Threads.@spawn ACEfit.assemble_weights_new(data; kwargs...)
raw_data = @showprogress desc="Assembly progress:" pmap( data; batch_size=batch_size ) do d
A = ACEfit.feature_matrix(d, basis; kwargs...)
Y = ACEfit.target_vector(d; kwargs...)
(A, Y)
end
A = [ a[1] for a in raw_data ]
Y = [ a[2] for a in raw_data ]

A_final = reduce(vcat, A)
Y_final = reduce(vcat, Y)
return A_final, Y_final, fetch(W)
end

function assemble_weights_new(data::AbstractArray; kwargs...)
w = map( data ) do d
ACEfit.weight_vector(d; kwargs...)
end
return reduce(vcat, w)
end
Loading