Skip to content

Commit

Permalink
When writing scalars with parallel I/O, only write from root process
Browse files Browse the repository at this point in the history
This avoids multiple processes trying to write at the same time and
having to wait for each other, and also avoids having which process
finally wrote the value be random.
  • Loading branch information
johnomotani committed Nov 14, 2023
1 parent 5009e80 commit 9df65d0
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 73 deletions.
163 changes: 91 additions & 72 deletions src/file_io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1009,12 +1009,13 @@ function reopen_dfns_io(file_info)
end

"""
append_to_dynamic_var(io_var, data, t_idx, coords...)
append_to_dynamic_var(io_var, data, t_idx, parallel_io, coords...)
Append `data` to the dynamic variable `io_var`. The time-index of the data being appended
is `t_idx`. `coords...` is used to get the ranges to write from/to (needed for parallel
I/O) - the entries in the `coords` tuple can be either `coordinate` instances or integers
(for an integer `n` the range is `1:n`).
is `t_idx`. `parallel_io` indicates whether parallel I/O is being used. `coords...` is
used to get the ranges to write from/to (needed for parallel I/O) - the entries in the
`coords` tuple can be either `coordinate` instances or integers (for an integer `n` the
range is `1:n`).
"""
function append_to_dynamic_var() end

Expand All @@ -1035,86 +1036,94 @@ function write_moments_data_to_binary(moments, fields, t, n_ion_species,
closefile = true
end

parallel_io = io_moments.parallel_io

# add the time for this time slice to the hdf5 file
append_to_dynamic_var(io_moments.time, t, t_idx)
append_to_dynamic_var(io_moments.time, t, t_idx, parallel_io)

# add the electrostatic potential and electric field components at this time slice to the hdf5 file
append_to_dynamic_var(io_moments.phi, fields.phi, t_idx, z, r)
append_to_dynamic_var(io_moments.Er, fields.Er, t_idx, z, r)
append_to_dynamic_var(io_moments.Ez, fields.Ez, t_idx, z, r)
append_to_dynamic_var(io_moments.phi, fields.phi, t_idx, parallel_io, z, r)
append_to_dynamic_var(io_moments.Er, fields.Er, t_idx, parallel_io, z, r)
append_to_dynamic_var(io_moments.Ez, fields.Ez, t_idx, parallel_io, z, r)

# add the density data at this time slice to the output file
append_to_dynamic_var(io_moments.density, moments.charged.dens, t_idx, z, r,
n_ion_species)
append_to_dynamic_var(io_moments.parallel_flow, moments.charged.upar, t_idx, z, r,
n_ion_species)
append_to_dynamic_var(io_moments.density, moments.charged.dens, t_idx,
parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.parallel_flow, moments.charged.upar, t_idx,
parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.parallel_pressure, moments.charged.ppar, t_idx,
z, r, n_ion_species)
parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.perpendicular_pressure, moments.charged.pperp, t_idx,
z, r, n_ion_species)
parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.parallel_heat_flux, moments.charged.qpar, t_idx,
z, r, n_ion_species)
append_to_dynamic_var(io_moments.thermal_speed, moments.charged.vth, t_idx, z, r,
n_ion_species)
parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.thermal_speed, moments.charged.vth, t_idx,
parallel_io, z, r, n_ion_species)
if z.irank == 0 # lower wall
append_to_dynamic_var(io_moments.chodura_integral_lower, moments.charged.chodura_integral_lower, t_idx, r,
n_ion_species)
append_to_dynamic_var(io_moments.chodura_integral_lower,
moments.charged.chodura_integral_lower, t_idx,
parallel_io, r, n_ion_species)
elseif io_moments.chodura_integral_lower !== nothing
append_to_dynamic_var(io_moments.chodura_integral_lower, moments.charged.chodura_integral_lower, t_idx, 0,
n_ion_species)
append_to_dynamic_var(io_moments.chodura_integral_lower,
moments.charged.chodura_integral_lower, t_idx,
parallel_io, 0, n_ion_species)
end
if z.irank == z.nrank - 1 # upper wall
append_to_dynamic_var(io_moments.chodura_integral_upper, moments.charged.chodura_integral_upper, t_idx, r,
n_ion_species)
append_to_dynamic_var(io_moments.chodura_integral_upper,
moments.charged.chodura_integral_upper, t_idx,
parallel_io, r, n_ion_species)
elseif io_moments.chodura_integral_upper !== nothing
append_to_dynamic_var(io_moments.chodura_integral_upper, moments.charged.chodura_integral_upper, t_idx, 0,
n_ion_species)
append_to_dynamic_var(io_moments.chodura_integral_upper,
moments.charged.chodura_integral_upper, t_idx,
parallel_io, 0, n_ion_species)
end
if io_moments.external_source_amplitude !== nothing
append_to_dynamic_var(io_moments.external_source_amplitude,
moments.charged.external_source_amplitude, t_idx, z, r)
moments.charged.external_source_amplitude, t_idx,
parallel_io, z, r)
end
if io_moments.external_source_controller_integral !== nothing
if size(moments.charged.external_source_controller_integral) == (1,1)
append_to_dynamic_var(io_moments.external_source_controller_integral,
moments.charged.external_source_controller_integral[1,1],
t_idx)
t_idx, parallel_io)
else
append_to_dynamic_var(io_moments.external_source_controller_integral,
moments.charged.external_source_controller_integral,
t_idx, z, r)
t_idx, parallel_io, z, r)
end
end
if n_neutral_species > 0
append_to_dynamic_var(io_moments.density_neutral, moments.neutral.dens, t_idx,
z, r, n_neutral_species)
append_to_dynamic_var(io_moments.uz_neutral, moments.neutral.uz, t_idx, z, r,
n_neutral_species)
append_to_dynamic_var(io_moments.pz_neutral, moments.neutral.pz, t_idx, z, r,
n_neutral_species)
append_to_dynamic_var(io_moments.qz_neutral, moments.neutral.qz, t_idx, z, r,
n_neutral_species)
parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.uz_neutral, moments.neutral.uz, t_idx,
parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.pz_neutral, moments.neutral.pz, t_idx,
parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.qz_neutral, moments.neutral.qz, t_idx,
parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.thermal_speed_neutral, moments.neutral.vth,
t_idx, z, r, n_neutral_species)
t_idx, parallel_io, z, r, n_neutral_species)

if io_moments.external_source_neutral_amplitude !== nothing
append_to_dynamic_var(io_moments.external_source_neutral_amplitude,
moments.neutral.external_source_amplitude, t_idx, z, r)
moments.neutral.external_source_amplitude, t_idx,
parallel_io, z, r)
end
if io_moments.external_source_neutral_controller_integral !== nothing
if size(moments.neutral.external_source_neutral_controller_integral) == (1,1)
append_to_dynamic_var(io_moments.external_source_neutral_controller_integral,
moments.neutral.external_source_controller_integral[1,1],
t_idx)
t_idx, parallel_io)
else
append_to_dynamic_var(io_moments.external_source_neutral_controller_integral,
moments.neutral.external_source_controller_integral,
t_idx, z, r)
t_idx, parallel_io, z, r)
end
end
end

append_to_dynamic_var(io_moments.time_for_run, time_for_run, t_idx)
append_to_dynamic_var(io_moments.time_for_run, time_for_run, t_idx, parallel_io)

closefile && close(io_moments.fid)
end
Expand Down Expand Up @@ -1143,11 +1152,14 @@ function write_dfns_data_to_binary(ff, ff_neutral, moments, fields, t, n_ion_spe
write_moments_data_to_binary(moments, fields, t, n_ion_species, n_neutral_species,
io_dfns.io_moments, t_idx, time_for_run, r, z)

parallel_io = io_dfns.parallel_io

# add the distribution function data at this time slice to the output file
append_to_dynamic_var(io_dfns.f, ff, t_idx, vpa, vperp, z, r, n_ion_species)
append_to_dynamic_var(io_dfns.f, ff, t_idx, parallel_io, vpa, vperp, z, r,
n_ion_species)
if n_neutral_species > 0
append_to_dynamic_var(io_dfns.f_neutral, ff_neutral, t_idx, vz, vr, vzeta, z,
r, n_neutral_species)
append_to_dynamic_var(io_dfns.f_neutral, ff_neutral, t_idx, parallel_io, vz,
vr, vzeta, z, r, n_neutral_species)
end

closefile && close(io_dfns.fid)
Expand All @@ -1172,80 +1184,85 @@ end
closefile = true
end

parallel_io = io_moments.parallel_io

# add the time for this time slice to the hdf5 file
append_to_dynamic_var(io_moments.time, t, t_idx)
append_to_dynamic_var(io_moments.time, t, t_idx, parallel_io)

# add the electrostatic potential and electric field components at this time slice to the hdf5 file
append_to_dynamic_var(io_moments.phi, fields.phi.data, t_idx, z, r)
append_to_dynamic_var(io_moments.Er, fields.Er.data, t_idx, z, r)
append_to_dynamic_var(io_moments.Ez, fields.Ez.data, t_idx, z, r)
append_to_dynamic_var(io_moments.phi, fields.phi.data, t_idx, parallel_io, z,
r)
append_to_dynamic_var(io_moments.Er, fields.Er.data, t_idx, parallel_io, z, r)
append_to_dynamic_var(io_moments.Ez, fields.Ez.data, t_idx, parallel_io, z, r)

# add the density data at this time slice to the output file
append_to_dynamic_var(io_moments.density, moments.charged.dens.data, t_idx, z,
r, n_ion_species)
append_to_dynamic_var(io_moments.density, moments.charged.dens.data, t_idx,
parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.parallel_flow, moments.charged.upar.data,
t_idx, z, r, n_ion_species)
t_idx, parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.parallel_pressure, moments.charged.ppar.data,
t_idx, z, r, n_ion_species)
t_idx, parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.perpendicular_pressure, moments.charged.pperp.data,
t_idx, z, r, n_ion_species)
t_idx, parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.parallel_heat_flux,
moments.charged.qpar.data, t_idx, z, r, n_ion_species)
moments.charged.qpar.data, t_idx, parallel_io, z, r,
n_ion_species)
append_to_dynamic_var(io_moments.thermal_speed, moments.charged.vth.data,
t_idx, z, r, n_ion_species)
t_idx, parallel_io, z, r, n_ion_species)
append_to_dynamic_var(io_moments.chodura_integral_lower, moments.charged.chodura_integral_lower.data,
t_idx, r, n_ion_species)
t_idx, parallel_io, r, n_ion_species)
append_to_dynamic_var(io_moments.chodura_integral_upper, moments.charged.chodura_integral_upper.data,
t_idx, r, n_ion_species)
t_idx, parallel_io, r, n_ion_species)
if io_moments.external_source_amplitude !== nothing
append_to_dynamic_var(io_moments.external_source_amplitude,
moments.charged.external_source_amplitude.data,
t_idx, z, r)
t_idx, parallel_io, z, r)
end
if io_moments.external_source_controller_integral !== nothing
if size(moments.charged.external_source_controller_integral) == (1,1)
append_to_dynamic_var(io_moments.external_source_controller_integral,
moments.charged.external_source_controller_integral[1,1],
t_idx)
t_idx, parallel_io)
else
append_to_dynamic_var(io_moments.external_source_controller_integral,
moments.charged.external_source_controller_integral,data,
t_idx, z, r)
t_idx, parallel_io, z, r)
end
end
if n_neutral_species > 0
append_to_dynamic_var(io_moments.density_neutral,
moments.neutral.dens.data, t_idx, z, r,
moments.neutral.dens.data, t_idx, parallel_io, z, r,
n_neutral_species)
append_to_dynamic_var(io_moments.uz_neutral, moments.neutral.uz.data,
t_idx, z, r, n_neutral_species)
t_idx, parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.pz_neutral, moments.neutral.pz.data,
t_idx, z, r, n_neutral_species)
t_idx, parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.qz_neutral, moments.neutral.qz.data,
t_idx, z, r, n_neutral_species)
t_idx, parallel_io, z, r, n_neutral_species)
append_to_dynamic_var(io_moments.thermal_speed_neutral,
moments.neutral.vth.data, t_idx, z, r,
moments.neutral.vth.data, t_idx, parallel_io, z, r,
n_neutral_species)

if io_moments.external_source_neutral_amplitude !== nothing
append_to_dynamic_var(io_moments.external_source_neutral_amplitude,
moments.neutral.external_source_amplitude,
t_idx, z, r)
t_idx, parallel_io, z, r)
end
if io_moments.external_source_neutral_controller_integral !== nothing
if size(moments.neutral.external_source_neutral_controller_integral) == (1,1)
append_to_dynamic_var(io_moments.external_source_neutral_controller_integral,
moments.neutral.external_source_controller_integral[1,1],
t_idx)
t_idx, parallel_io)
else
append_to_dynamic_var(io_moments.external_source_neutral_controller_integral,
moments.neutral.external_source_controller_integral,
t_idx, z, r)
t_idx, parallel_io, z, r)
end
end
end

append_to_dynamic_var(io_moments.time_for_run, time_for_run, t_idx)
append_to_dynamic_var(io_moments.time_for_run, time_for_run, t_idx,
parallel_io)

closefile && close(io_moments.fid)
end
Expand Down Expand Up @@ -1276,12 +1293,14 @@ end
n_neutral_species, io_dfns.io_moments, t_idx, r,
z)

parallel_io = io_dfns.parallel_io

# add the distribution function data at this time slice to the output file
append_to_dynamic_var(io_dfns.f, ff.data, t_idx, vpa, vperp, z, r,
n_ion_species)
append_to_dynamic_var(io_dfns.f, ff.data, t_idx, parallel_io, vpa, vperp, z,
r, n_ion_species)
if n_neutral_species > 0
append_to_dynamic_var(io_dfns.f_neutral, ff_neutral.data, t_idx, vz, vr,
vzeta, z, r, n_neutral_species)
append_to_dynamic_var(io_dfns.f_neutral, ff_neutral.data, t_idx,
parallel_io, vz, vr, vzeta, z, r, n_neutral_species)
end

closefile && close(io_dfns.fid)
Expand Down
9 changes: 8 additions & 1 deletion src/file_io_hdf5.jl
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ end

function append_to_dynamic_var(io_var::HDF5.Dataset,
data::Union{Number,AbstractArray{T,N}}, t_idx,
parallel_io::Bool,
coords::Union{coordinate,Integer}...) where {T,N}
# Extend time dimension for this variable
dims = size(io_var)
Expand All @@ -274,7 +275,13 @@ function append_to_dynamic_var(io_var::HDF5.Dataset,
global_ranges = Tuple(isa(c, coordinate) ? c.global_io_range : 1:c for c coords)

if isa(data, Number)
io_var[t_idx] = data
if !parallel_io || global_rank[] == 0
# A scalar value is required to be the same on all processes, so when using
# parallel I/O, only write from one process to avoid overwriting (which would
# mean processes having to wait, and make which process wrote the final value
# random).
io_var[t_idx] = data
end
elseif N == 1
io_var[global_ranges[1], t_idx] = @view data[local_ranges[1]]
elseif N == 2
Expand Down
1 change: 1 addition & 0 deletions src/file_io_netcdf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ end

function append_to_dynamic_var(io_var::NCDatasets.CFVariable,
data::Union{Number,AbstractArray{T,N}}, t_idx,
parallel_io::Bool,
coords...) where {T,N}

if isa(data, Number)
Expand Down

0 comments on commit 9df65d0

Please sign in to comment.