From 56b904d08384b7ba1e176dc136daadd4ab51d24e Mon Sep 17 00:00:00 2001 From: Fabian Gans Date: Thu, 29 Jun 2023 11:21:17 +0200 Subject: [PATCH] Manually run gc after chunk processing (#265) * Manually run gc after chunk processing * Add gc kwarg and CachingPool * Fix EDL integrarion Tests * remove pmap_with_data * Fix cube conversion * Allow to manually set prom_type --- src/Cubes/Rechunker.jl | 1 + src/DAT/DAT.jl | 35 +++++++++++++---------------------- src/DAT/dciterators.jl | 7 ++++++- src/DAT/tablestats.jl | 1 + src/DatasetAPI/Datasets.jl | 19 +++++++++++-------- test/dimarray.jl | 2 +- 6 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/Cubes/Rechunker.jl b/src/Cubes/Rechunker.jl index 3660c8a0..f8017b45 100644 --- a/src/Cubes/Rechunker.jl +++ b/src/Cubes/Rechunker.jl @@ -99,6 +99,7 @@ Internal function which copies the data from the input `inar` into the output `o function copydata(outar,inar,copybuf) @showprogress for ii in copybuf outar[ii...] = inar[ii...] + GC.gc() end end diff --git a/src/DAT/DAT.jl b/src/DAT/DAT.jl index 05f0a2ae..533424b1 100644 --- a/src/DAT/DAT.jl +++ b/src/DAT/DAT.jl @@ -13,7 +13,8 @@ using Distributed: remotecall, @spawn, AbstractWorkerPool, - default_worker_pool + default_worker_pool, + CachingPool import ..Cubes: cubechunks, iscompressed, chunkoffset, CubeAxis, YAXArray, caxes, YAXSlice import ..Cubes.Axes: AxisDescriptor, axname, ByInference, axsym, getOutAxis, getAxis, findAxis, match_axis @@ -239,6 +240,8 @@ mutable struct DATConfig{NIN,NOUT} include_loopvars::Bool "" ntr::Any + "Flag if GC should be called explicitly. Probably necessary for many runs in Julia 1.9" + do_gc::Bool "Additional arguments for the inner function" addargs::Any "Additional keyword arguments for the inner function" @@ -255,6 +258,7 @@ function DATConfig( include_loopvars, allow_irregular, nthreads, + do_gc, addargs, kwargs, ) @@ -285,6 +289,7 @@ function DATConfig( inplace, # inplace include_loopvars, nthreads, + do_gc, addargs, # addargs kwargs, ) @@ -423,6 +428,7 @@ function mapCube( nthreads = ispar ? Dict(i => remotecall_fetch(Threads.nthreads, i) for i in workers()) : [Threads.nthreads()], loopchunksize = Dict(), + do_gc = true, kwargs..., ) @@ -460,6 +466,7 @@ function mapCube( include_loopvars, irregular_loopranges, nthreads, + do_gc, addargs, kwargs, ) @@ -657,22 +664,6 @@ _writedata(d::Array{<:Any,0},cache::Array{<:Any,0},::Tuple{}) = d[] = cache[] updateinars(dc, r, incaches) = updatears(dc.incubes, r, :read, incaches) writeoutars(dc, r, outcaches) = updatears(dc.outcubes, r, :write, outcaches) -function pmap_with_data(f, p::AbstractWorkerPool, c...; initfunc, progress=nothing, kwargs...) - d = Dict(ip=>remotecall(initfunc, ip) for ip in workers(p)) - allrefs = @spawn d - function fnew(args...,) - refdict = fetch(allrefs) - myargs = fetch(refdict[myid()]) - f(args..., myargs) - end - if progress !==nothing - progress_pmap(fnew,p,c...;progress=progress,kwargs...) - else - pmap(fnew,p,c...;kwargs...) - end -end -pmap_with_data(f,c...;initfunc,kwargs...) = pmap_with_data(f,default_worker_pool(),c...;initfunc,kwargs...) - function moduleloadedeverywhere() try isloaded = map(workers()) do w @@ -693,14 +684,13 @@ function runLoop(dc::DATConfig, showprog) moduleloadedeverywhere() || error( "YAXArrays is not loaded on all workers. Please run `@everywhere using YAXArrays` to fix.", ) - dcref = @spawn dc - prepfunc = ()->getallargs(fetch(dcref)) - prog = showprog ? Progress(length(allRanges)) : nothing - pmap_with_data(allRanges, initfunc=prepfunc, progress=prog) do r, prep - incaches, outcaches, args = prep + mapfun = showprog ? progress_pmap : pmap + mapfun(CachingPool(workers()),allRanges, on_error=identity) do r + incaches, outcaches, args = getallargs(dc) updateinars(dc, r, incaches) innerLoop(r, args...) writeoutars(dc, r, outcaches) + dc.do_gc && GC.gc() end else incaches, outcaches, args = getallargs(dc) @@ -709,6 +699,7 @@ function runLoop(dc::DATConfig, showprog) updateinars(dc, r, incaches) innerLoop(r, args...) writeoutars(dc, r, outcaches) + dc.do_gc && GC.gc() end end dc.outcubes diff --git a/src/DAT/dciterators.jl b/src/DAT/dciterators.jl index 53e46f46..1fabc69a 100644 --- a/src/DAT/dciterators.jl +++ b/src/DAT/dciterators.jl @@ -102,7 +102,12 @@ function YAXColumn(t::YAXTableChunk,ivar) allax = Base.setindex(allax,true,il) end for il in ic.icolon - allax = Base.setindex(allax,Colon(),il) + i_insert = findfirst(==(il),cumsum(allax)) + allax = if i_insert === nothing + (allax...,Colon()) + else + (allax[1:i_insert]...,Colon(),allax[i_insert+1:end]...) + end end inarbc = if ic.colonperm === nothing pa = PickAxisArray(buf, allax) diff --git a/src/DAT/tablestats.jl b/src/DAT/tablestats.jl index 19599256..542cf879 100644 --- a/src/DAT/tablestats.jl +++ b/src/DAT/tablestats.jl @@ -252,6 +252,7 @@ function fittable(tab::CubeIterator, o, fitsym; by = (), weight = nothing, showp func(merge!,tab) do t agg = TableAggregator(t, o, fitsym, by = by, weight = weight) foreach(i -> fitrow!(agg, i), Tables.rows(t)) + GC.gc() agg end end diff --git a/src/DatasetAPI/Datasets.jl b/src/DatasetAPI/Datasets.jl index 967e9b6d..22bf60b1 100644 --- a/src/DatasetAPI/Datasets.jl +++ b/src/DatasetAPI/Datasets.jl @@ -313,7 +313,7 @@ to_array(ds::Dataset; joinname = "Variable") = Cube(ds;joinname) Construct a single YAXArray from the dataset `ds` by concatenating the cubes in the datset on the `joinname` dimension. """ -function Cube(ds::Dataset; joinname = "Variable") +function Cube(ds::Dataset; joinname = "Variable", target_type = nothing) dl = collect(keys(ds.axes)) dls = string.(dl) @@ -321,12 +321,15 @@ function Cube(ds::Dataset; joinname = "Variable") # TODO This is an ugly workaround to merge cubes with different element types, # There should bde a more generic solution eltypes = map(eltype, values(ds.cubes)) - prom_type = first(eltypes) - for i in 2:length(eltypes) - prom_type = promote_type(prom_type,eltypes[i]) - if !isconcretetype(prom_type) - wrongvar = collect(keys(ds.cubes))[i] - throw(ArgumentError("Could not promote element types of cubes in dataset to a common concrete type, because of Variable $wrongvar")) + prom_type = target_type + if prom_type === nothing + prom_type = first(eltypes) + for i in 2:length(eltypes) + prom_type = promote_type(prom_type,eltypes[i]) + if !isconcretetype(Base.nonmissingtype(prom_type)) + wrongvar = collect(keys(ds.cubes))[i] + throw(ArgumentError("Could not promote element types of cubes in dataset to a common concrete type, because of Variable $wrongvar")) + end end end newkeys = Symbol[] @@ -344,7 +347,7 @@ function Cube(ds::Dataset; joinname = "Variable") if eltype(ds.cubes[k]) <: prom_type ds.cubes[k] else - map(prom_type,ds.cubes[k]) + map(Base.Fix1(convert,prom_type),ds.cubes[k]) end end foreach( diff --git a/test/dimarray.jl b/test/dimarray.jl index d9ee7670..861a80ae 100644 --- a/test/dimarray.jl +++ b/test/dimarray.jl @@ -89,7 +89,7 @@ end #I am not sure, whether this is an actual use case # and whether we would like to support the mix of symbol and string axisnames. - @test_broken r = mapslices(sum, yax, dims="axis1") + @test_broken mapslices(sum, yax, dims="axis1") end @testitem "Moving Window DimArray" begin