From bc4dc712186ae81c53c96e8183961aa9f501f31a Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Tue, 12 May 2020 22:50:58 +0200 Subject: [PATCH] Support multithreading in groupreduce Keep the default to a single thread until we find a reliable way of predicting a reasonably optimal number of threads. --- .travis.yml | 3 + src/DataFrames.jl | 3 + src/groupeddataframe/splitapplycombine.jl | 168 +++++++++++++++------- test/grouping.jl | 44 ++++-- 4 files changed, 155 insertions(+), 63 deletions(-) diff --git a/.travis.yml b/.travis.yml index 17b55b739b..7f06bed3c8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,6 +17,9 @@ arch: notifications: email: false +env: + - JULIA_NUM_THREADS=2 + after_success: - julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage; Coveralls.submit(Coveralls.process_folder())' diff --git a/src/DataFrames.jl b/src/DataFrames.jl index 6a8b7db008..47b71ddb9b 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -7,6 +7,7 @@ using Base.Sort, Base.Order, Base.Iterators using TableTraits, IteratorInterfaceExtensions import LinearAlgebra: norm using Markdown +using Base.Threads import DataAPI, DataAPI.All, @@ -87,6 +88,8 @@ else export only end +const NTHREADS = 1 + include("other/utils.jl") include("other/index.jl") diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index d7b1c23d86..c774c817ba 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -453,18 +453,22 @@ julia> combine(gd, :, AsTable(Not(:a)) => sum, renamecols=false) ``` """ function combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=NTHREADS) return combine_helper(f, gd, keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols) + copycols=true, keeprows=false, renamecols=renamecols, + nthreads=nthreads) end combine(f::typeof(nrow), gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=NTHREADS) = combine(gd, [nrow => :nrow], keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols) function combine(p::Pair, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=NTHREADS) # move handling of aggregate to specialized combine p_from, p_to = p @@ -484,20 +488,24 @@ function combine(p::Pair, gd::GroupedDataFrame; cs = p_from end return combine_helper(cs => p_to, gd, keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols) + copycols=true, keeprows=false, renamecols=renamecols, + nthreads=nthreads) end combine(gd::GroupedDataFrame, cs::Union{Pair, typeof(nrow), ColumnIndex, MultiColumnIndex}...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=NTHREADS) = _combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols) + copycols=true, keeprows=false, renamecols=renamecols, + nthreads=nthreads) function _combine_prepare(gd::GroupedDataFrame, @nospecialize(cs::Union{Pair, typeof(nrow), ColumnIndex, MultiColumnIndex}...); keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool) + keeprows::Bool, renamecols::Bool, + nthreads::Int) cs_vec = [] for p in cs if p === nrow @@ -570,7 +578,8 @@ function _combine_prepare(gd::GroupedDataFrame, f = Pair[first(x) => first(last(x)) for x in cs_norm] nms = Symbol[last(last(x)) for x in cs_norm] return combine_helper(f, gd, nms, keepkeys=keepkeys, ungroup=ungroup, - copycols=copycols, keeprows=keeprows, renamecols=renamecols) + copycols=copycols, keeprows=keeprows, renamecols=renamecols, + nthreads=nthreads) end function gen_groups(idx::Vector{Int}) @@ -590,11 +599,12 @@ end function combine_helper(f, gd::GroupedDataFrame, nms::Union{AbstractVector{Symbol},Nothing}=nothing; keepkeys::Bool, ungroup::Bool, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, + nthreads::Int) if !ungroup && !keepkeys throw(ArgumentError("keepkeys=false when ungroup=false is not allowed")) end - idx, valscat = _combine(f, gd, nms, copycols, keeprows, renamecols) + idx, valscat = _combine(f, gd, nms, copycols, keeprows, renamecols, nthreads) !keepkeys && ungroup && return valscat keys = groupcols(gd) for key in keys @@ -985,24 +995,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T end function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) + incol::AbstractVector, gd::GroupedDataFrame; nthreads::Int) n = length(gd) + groups = gd.groups if adjust !== nothing || checkempty counts = zeros(Int, n) end - groups = gd.groups - @inbounds for i in eachindex(incol, groups) - gix = groups[i] - x = incol[i] - if gix > 0 && (condf === nothing || condf(x)) - # this check should be optimized out if U is not Any - if eltype(res) === Any && !isassigned(res, gix) - res[gix] = f(x, gix) - else - res[gix] = op(res[gix], f(x, gix)) + nt = min(nthreads, Threads.nthreads()) + if nt <= 1 || axes(incol) != axes(groups) + @inbounds for i in eachindex(incol, groups) + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if U is not Any + if eltype(res) === Any && !isassigned(res, gix) + res[gix] = f(x, gix) + else + res[gix] = op(res[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts[gix] += 1 + end end + end + else + res_vec = Vector{typeof(res)}(undef, nt) + # needs to be always allocated to fix type instability with @threads + counts_vec = Vector{Vector{Int}}(undef, nt) + res_vec[1] = res + if adjust !== nothing || checkempty + counts_vec[1] = counts + end + for i in 2:nt + res_vec[i] = copy(res) if adjust !== nothing || checkempty - counts[gix] += 1 + counts_vec[i] = zeros(Int, n) + end + end + Threads.@threads for tid in 1:nt + res′ = res_vec[tid] + if adjust !== nothing || checkempty + counts′ = counts_vec[tid] + end + start = 1 + ((tid - 1) * length(groups)) ÷ nt + stop = (tid * length(groups)) ÷ nt + @inbounds for i in start:stop + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if U is not Any + if eltype(res′) === Any && !isassigned(res′, gix) + res′[gix] = f(x, gix) + else + res′[gix] = op(res′[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts′[gix] += 1 + end + end + end + end + for i in 2:length(res_vec) + res .= op.(res, res_vec[i]) + end + if adjust !== nothing || checkempty + for i in 2:length(counts_vec) + counts .+= counts_vec[i] end end end @@ -1042,17 +1100,20 @@ end # function barrier works around type instability of groupreduce_init due to applicable groupreduce(f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) = + incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Int) = groupreduce!(groupreduce_init(op, condf, adjust, incol, gd), - f, op, condf, adjust, checkempty, incol, gd) + f, op, condf, adjust, checkempty, incol, gd, nthreads=nthreads) # Avoids the overhead due to Missing when computing reduction groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) = + incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Int) = groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)), - f, op, condf, adjust, checkempty, incol, gd) + f, op, condf, adjust, checkempty, incol, gd, nthreads=nthreads) -(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) = - groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd) +(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; nthreads::Int=NTHREADS) = + groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, + nthreads=nthreads) # this definition is missing in Julia 1.0 LTS and is required by aggregation for var # TODO: remove this when we drop 1.0 support @@ -1060,8 +1121,10 @@ if VERSION < v"1.1" Base.zero(::Type{Missing}) = missing end -function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame) - means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd) +function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Int=NTHREADS) + means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd, + nthreads=nthreads) # !ismissing check is purely an optimization to avoid a copy later if eltype(means) >: Missing && agg.condf !== !ismissing T = Union{Missing, real(eltype(means))} @@ -1071,10 +1134,11 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra res = zeros(T, length(gd)) return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf, (x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1), - false, incol, gd) + false, incol, gd, nthreads=nthreads) end -function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame) +function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Int=NTHREADS) outcol = Aggregate(var, agg.condf)(incol, gd) if eltype(outcol) <: Union{Missing, Rational} return sqrt.(outcol) @@ -1083,20 +1147,25 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra end end -for f in (first, last) - function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame) - n = length(gd) - outcol = similar(incol, n) - fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) - if isconcretetype(eltype(outcol)) - return outcol - else - return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) +for f in (:first, :last) + # Without using @eval the presence of a keyword argument triggers a Julia bug + @eval begin + function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Int=NTHREADS) + n = length(gd) + outcol = similar(incol, n) + fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) + if isconcretetype(eltype(outcol)) + return outcol + else + return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) + end end end end -function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame) +function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Int=NTHREADS) if getfield(gd, :idx) === nothing lens = zeros(Int, length(gd)) @inbounds for gix in gd.groups @@ -1143,7 +1212,7 @@ end function _combine(f::AbstractVector{<:Pair}, gd::GroupedDataFrame, nms::AbstractVector{Symbol}, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int) # here f should be normalized and in a form of source_cols => fun @assert all(x -> first(x) isa Union{Int, AbstractVector{Int}, AsTable}, f) @assert all(x -> last(x) isa Base.Callable, f) @@ -1185,7 +1254,7 @@ function _combine(f::AbstractVector{<:Pair}, if length(gd) > 0 && isagg(p, gd) incol = parentdf[!, source_cols] agg = check_aggregate(last(p), incol) - outcol = agg(incol, gd) + outcol = agg(incol, gd, nthreads=nthreads) res[i] = idx_agg, outcol elseif keeprows && fun === identity && !(source_cols isa AsTable) @assert source_cols isa Union{Int, AbstractVector{Int}} @@ -1283,7 +1352,7 @@ function _combine(f::AbstractVector{<:Pair}, end function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int) @assert copycols && !keeprows # use `similar` as `gd` might have been subsetted firstres = length(gd) > 0 ? fun(gd[1]) : fun(similar(parent(gd), 0)) @@ -1293,7 +1362,7 @@ function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing, end function _combine(p::Pair, gd::GroupedDataFrame, ::Nothing, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int) # here p should not be normalized as we allow tabular return value from fun # map and combine should not dispatch here if p is isagg @assert copycols && !keeprows @@ -1708,9 +1777,10 @@ julia> select(gd, :, AsTable(Not(:a)) => sum, renamecols=false) ``` """ select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true) = + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=NTHREADS) = _combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, keeprows=true, renamecols=renamecols) + ungroup=ungroup, keeprows=true, renamecols=renamecols, + nthreads=NTHREADS) """ transform(gd::GroupedDataFrame, args...; diff --git a/test/grouping.jl b/test/grouping.jl index c8839c9fa9..313d9ad218 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -98,6 +98,22 @@ function groupby_checked(df::AbstractDataFrame, keys, args...; kwargs...) return ogd end +function combine_checked(gd::GroupedDataFrame, args...; kwargs...) + res1 = combine(gd, args...; nthreads=1, kwargs...) + res2 = combine(gd, args...; nthreads=2, kwargs...) + @test names(res1) == names(res2) + for (c1, c2) in zip(eachcol(res1), eachcol(res2)) + if eltype(c1) <: Union{AbstractFloat, Missing} + @test ismissing.(c1) == ismissing.(c2) + @test isapprox(collect(skipmissing(c1)), collect(skipmissing(c2)), + nans=true) + else + @test c1 ≅ c2 + end + end + res1 +end + @testset "parent" begin df = DataFrame(a = [1, 1, 2, 2], b = [5, 6, 7, 8]) gd = groupby_checked(df, :a) @@ -805,7 +821,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x1 => f => :y) + res = combine_checked(gd, :x1 => f => :y) expected = combine(gd, :x1 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -815,7 +831,7 @@ Base.isless(::TestType, ::TestType) = false df.x3 = Vector{T}(df.x1) gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -827,11 +843,11 @@ Base.isless(::TestType, ::TestType) = false df.x3[1] = missing gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -843,7 +859,7 @@ Base.isless(::TestType, ::TestType) = false if f in (maximum, minimum, first, last) @test_throws ArgumentError combine(gd, :x3 => f∘skipmissing => :y) else - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -854,7 +870,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x2 => f => :y) + res = combine_checked(gd, :x2 => f => :y) expected = combine(gd, :x2 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -867,14 +883,14 @@ Base.isless(::TestType, ::TestType) = false m && (df.x3[1] = missing) gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) f === length && continue - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -883,7 +899,7 @@ Base.isless(::TestType, ::TestType) = false @test_throws ArgumentError combine(gd, :x3 => f∘skipmissing => :y) end end - @test combine(gd, :x1 => maximum => :y, :x2 => sum => :z) ≅ + @test combine_checked(gd, :x1 => maximum => :y, :x2 => sum => :z) ≅ combine(gd, :x1 => (x -> maximum(x)) => :y, :x2 => (x -> sum(x)) => :z) # Test floating point corner cases @@ -894,7 +910,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x1 => f => :y) + res = combine_checked(gd, :x1 => f => :y) expected = combine(gd, :x1 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -905,18 +921,18 @@ Base.isless(::TestType, ::TestType) = false df.x3[1] = missing gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) end df = DataFrame(x = [1, 1, 2, 2], y = Any[1, 2.0, 3.0, 4.0]) - res = combine(groupby_checked(df, :x), :y => maximum => :z) + res = combine_checked(groupby_checked(df, :x), :y => maximum => :z) @test res.z isa Vector{Float64} @test res.z == combine(groupby_checked(df, :x), :y => (x -> maximum(x)) => :z).z @@ -925,7 +941,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :x, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices for f in (maximum, minimum) - res = combine(gd, :y => maximum => :z) + res = combine_checked(gd, :y => maximum => :z) @test res.z isa Vector{Any} @test res.z == combine(gd, :y => (x -> maximum(x)) => :z).z end