Skip to content

Commit

Permalink
Manually run gc after chunk processing (#265)
Browse files Browse the repository at this point in the history
* Manually run gc after chunk processing

* Add gc kwarg and CachingPool

* Fix EDL integrarion Tests

* remove pmap_with_data

* Fix cube conversion

* Allow to manually set prom_type
  • Loading branch information
meggart authored and lazarusA committed Jun 29, 2023
1 parent b1a621d commit 56b904d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/Cubes/Rechunker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Internal function which copies the data from the input `inar` into the output `o
function copydata(outar,inar,copybuf)
@showprogress for ii in copybuf
outar[ii...] = inar[ii...]
GC.gc()
end
end

Expand Down
35 changes: 13 additions & 22 deletions src/DAT/DAT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ using Distributed:
remotecall,
@spawn,
AbstractWorkerPool,
default_worker_pool
default_worker_pool,
CachingPool
import ..Cubes: cubechunks, iscompressed, chunkoffset, CubeAxis, YAXArray, caxes, YAXSlice
import ..Cubes.Axes:
AxisDescriptor, axname, ByInference, axsym, getOutAxis, getAxis, findAxis, match_axis
Expand Down Expand Up @@ -239,6 +240,8 @@ mutable struct DATConfig{NIN,NOUT}
include_loopvars::Bool
""
ntr::Any
"Flag if GC should be called explicitly. Probably necessary for many runs in Julia 1.9"
do_gc::Bool
"Additional arguments for the inner function"
addargs::Any
"Additional keyword arguments for the inner function"
Expand All @@ -255,6 +258,7 @@ function DATConfig(
include_loopvars,
allow_irregular,
nthreads,
do_gc,
addargs,
kwargs,
)
Expand Down Expand Up @@ -285,6 +289,7 @@ function DATConfig(
inplace, # inplace
include_loopvars,
nthreads,
do_gc,
addargs, # addargs
kwargs,
)
Expand Down Expand Up @@ -423,6 +428,7 @@ function mapCube(
nthreads = ispar ? Dict(i => remotecall_fetch(Threads.nthreads, i) for i in workers()) :
[Threads.nthreads()],
loopchunksize = Dict(),
do_gc = true,
kwargs...,
)

Expand Down Expand Up @@ -460,6 +466,7 @@ function mapCube(
include_loopvars,
irregular_loopranges,
nthreads,
do_gc,
addargs,
kwargs,
)
Expand Down Expand Up @@ -657,22 +664,6 @@ _writedata(d::Array{<:Any,0},cache::Array{<:Any,0},::Tuple{}) = d[] = cache[]
updateinars(dc, r, incaches) = updatears(dc.incubes, r, :read, incaches)
writeoutars(dc, r, outcaches) = updatears(dc.outcubes, r, :write, outcaches)

function pmap_with_data(f, p::AbstractWorkerPool, c...; initfunc, progress=nothing, kwargs...)
d = Dict(ip=>remotecall(initfunc, ip) for ip in workers(p))
allrefs = @spawn d
function fnew(args...,)
refdict = fetch(allrefs)
myargs = fetch(refdict[myid()])
f(args..., myargs)
end
if progress !==nothing
progress_pmap(fnew,p,c...;progress=progress,kwargs...)
else
pmap(fnew,p,c...;kwargs...)
end
end
pmap_with_data(f,c...;initfunc,kwargs...) = pmap_with_data(f,default_worker_pool(),c...;initfunc,kwargs...)

function moduleloadedeverywhere()
try
isloaded = map(workers()) do w
Expand All @@ -693,14 +684,13 @@ function runLoop(dc::DATConfig, showprog)
moduleloadedeverywhere() || error(
"YAXArrays is not loaded on all workers. Please run `@everywhere using YAXArrays` to fix.",
)
dcref = @spawn dc
prepfunc = ()->getallargs(fetch(dcref))
prog = showprog ? Progress(length(allRanges)) : nothing
pmap_with_data(allRanges, initfunc=prepfunc, progress=prog) do r, prep
incaches, outcaches, args = prep
mapfun = showprog ? progress_pmap : pmap
mapfun(CachingPool(workers()),allRanges, on_error=identity) do r

Check warning on line 688 in src/DAT/DAT.jl

View check run for this annotation

Codecov / codecov/patch

src/DAT/DAT.jl#L687-L688

Added lines #L687 - L688 were not covered by tests
incaches, outcaches, args = getallargs(dc)
updateinars(dc, r, incaches)
innerLoop(r, args...)
writeoutars(dc, r, outcaches)
dc.do_gc && GC.gc()
end
else
incaches, outcaches, args = getallargs(dc)
Expand All @@ -709,6 +699,7 @@ function runLoop(dc::DATConfig, showprog)
updateinars(dc, r, incaches)
innerLoop(r, args...)
writeoutars(dc, r, outcaches)
dc.do_gc && GC.gc()
end
end
dc.outcubes
Expand Down
7 changes: 6 additions & 1 deletion src/DAT/dciterators.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ function YAXColumn(t::YAXTableChunk,ivar)
allax = Base.setindex(allax,true,il)
end
for il in ic.icolon
allax = Base.setindex(allax,Colon(),il)
i_insert = findfirst(==(il),cumsum(allax))
allax = if i_insert === nothing
(allax...,Colon())

Check warning on line 107 in src/DAT/dciterators.jl

View check run for this annotation

Codecov / codecov/patch

src/DAT/dciterators.jl#L105-L107

Added lines #L105 - L107 were not covered by tests
else
(allax[1:i_insert]...,Colon(),allax[i_insert+1:end]...)

Check warning on line 109 in src/DAT/dciterators.jl

View check run for this annotation

Codecov / codecov/patch

src/DAT/dciterators.jl#L109

Added line #L109 was not covered by tests
end
end
inarbc = if ic.colonperm === nothing
pa = PickAxisArray(buf, allax)
Expand Down
1 change: 1 addition & 0 deletions src/DAT/tablestats.jl
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ function fittable(tab::CubeIterator, o, fitsym; by = (), weight = nothing, showp
func(merge!,tab) do t
agg = TableAggregator(t, o, fitsym, by = by, weight = weight)
foreach(i -> fitrow!(agg, i), Tables.rows(t))
GC.gc()
agg
end
end
Expand Down
19 changes: 11 additions & 8 deletions src/DatasetAPI/Datasets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -313,20 +313,23 @@ to_array(ds::Dataset; joinname = "Variable") = Cube(ds;joinname)
Construct a single YAXArray from the dataset `ds`
by concatenating the cubes in the datset on the `joinname` dimension.
"""
function Cube(ds::Dataset; joinname = "Variable")
function Cube(ds::Dataset; joinname = "Variable", target_type = nothing)

dl = collect(keys(ds.axes))
dls = string.(dl)
length(ds.cubes) == 1 && return first(values(ds.cubes))
# TODO This is an ugly workaround to merge cubes with different element types,
# There should bde a more generic solution
eltypes = map(eltype, values(ds.cubes))
prom_type = first(eltypes)
for i in 2:length(eltypes)
prom_type = promote_type(prom_type,eltypes[i])
if !isconcretetype(prom_type)
wrongvar = collect(keys(ds.cubes))[i]
throw(ArgumentError("Could not promote element types of cubes in dataset to a common concrete type, because of Variable $wrongvar"))
prom_type = target_type
if prom_type === nothing
prom_type = first(eltypes)
for i in 2:length(eltypes)
prom_type = promote_type(prom_type,eltypes[i])
if !isconcretetype(Base.nonmissingtype(prom_type))
wrongvar = collect(keys(ds.cubes))[i]
throw(ArgumentError("Could not promote element types of cubes in dataset to a common concrete type, because of Variable $wrongvar"))
end
end
end
newkeys = Symbol[]
Expand All @@ -344,7 +347,7 @@ function Cube(ds::Dataset; joinname = "Variable")
if eltype(ds.cubes[k]) <: prom_type
ds.cubes[k]
else
map(prom_type,ds.cubes[k])
map(Base.Fix1(convert,prom_type),ds.cubes[k])
end
end
foreach(
Expand Down
2 changes: 1 addition & 1 deletion test/dimarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ end

#I am not sure, whether this is an actual use case
# and whether we would like to support the mix of symbol and string axisnames.
@test_broken r = mapslices(sum, yax, dims="axis1")
@test_broken mapslices(sum, yax, dims="axis1")
end

@testitem "Moving Window DimArray" begin
Expand Down

0 comments on commit 56b904d

Please sign in to comment.