Skip to content

Commit

Permalink
Small changes needed when dumping to spt3g format (#539)
Browse files Browse the repository at this point in the history
* Small spt3g fixes

* Fix bug in destructor

* Support breaking up observations into many small frames when saving to spt3g.  This ensures that all processes have some data in the case where there are a few long scans in each observation.

* Ensure hdf5 files are closed after load.  Support writing spt3g framefiles in parallel.

* Add support for writing gzipped spt3g files

* Simplify check
  • Loading branch information
tskisner authored Mar 16, 2022
1 parent 707250e commit 0303cdd
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 31 deletions.
6 changes: 5 additions & 1 deletion src/toast/_libtoast/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace py = pybind11;

template <typename T>
std::vector <char> align_format() {
return std::vector <char> ({'V'});
return std::vector <char> ({ 'V' });
}

template <>
Expand Down Expand Up @@ -72,6 +72,8 @@ void pybuffer_check_1D(py::buffer data) {
for (auto const & atp : tp) {
if (info.format[0] == atp) {
valid = true;
} else if ((info.format[0] == '<') && (info.format[1] == atp)) {
valid = true;
}
}
if (!valid) {
Expand Down Expand Up @@ -107,6 +109,8 @@ void pybuffer_check(py::buffer data) {
for (auto const & atp : tp) {
if (info.format[0] == atp) {
valid = true;
} else if ((info.format[0] == '<') && (info.format[1] == atp)) {
valid = true;
}
}
if (!valid) {
Expand Down
6 changes: 6 additions & 0 deletions src/toast/io/observation_hdf_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,4 +564,10 @@ def load_hdf5(
timer=timer,
)

# Clean up
del hgroup
if hf is not None:
hf.close()
del hf

return obs
2 changes: 1 addition & 1 deletion src/toast/observation_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def clear(self):
"""
if hasattr(self, "_data"):
del self._data
if not self._is_view:
if hasattr(self, "_is_view") and not self._is_view:
if hasattr(self, "_flatdata"):
del self._flatdata
if hasattr(self, "_raw"):
Expand Down
56 changes: 37 additions & 19 deletions src/toast/ops/save_spt3g.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class SaveSpt3g(Operator):

framefile_mb = Float(100.0, help="Target frame file size in MB")

gzip = Bool(False, help="If True, gzip compress the frame files")

# FIXME: We should add a filtering mechanism here to dump a subset of
# observations and / or detectors.

Expand Down Expand Up @@ -79,13 +81,6 @@ def _exec(self, data, detectors=None, **kwargs):
"You must set the obs_export trait before calling exec()"
)

# Find the process rank which will have all the frames
ex_rank = self.obs_export.export_rank
if ex_rank is None:
raise RuntimeError(
"The obs_export class must be configured to export frames on one process"
)

# One process creates the top directory
if data.comm.world_rank == 0:
os.makedirs(self.directory, exist_ok=True)
Expand All @@ -98,29 +93,52 @@ def _exec(self, data, detectors=None, **kwargs):
raise RuntimeError(
"Observations must have a name in order to save to SPT3G format"
)

# Export observation to frames on one process
frames = self.obs_export(ob)

# One process writes frame files
if ob.comm.group_rank == ex_rank:
ob_dir = os.path.join(self.directory, ob.name)
if ob.comm.group_rank == 0:
# Make observation directory. This should NOT already exist.
ob_dir = os.path.join(self.directory, ob.name)
os.makedirs(ob_dir)
if ob.comm.comm_group is not None:
ob.comm.comm_group.barrier()

emitter = frame_emitter(frames=frames)
# Export observation to frames
frames = self.obs_export(ob)

# If the export rank is set, then frames will be gathered to one
# process and written. Otherwise each process will write to
# sequential, independent frame files.
ex_rank = self.obs_export.export_rank
if ex_rank is None:
# All processes write independently
emitter = frame_emitter(frames=frames)
save_pipe = c3g.G3Pipeline()
save_pipe.Add(emitter)
fname = f"frames-{ob.comm.group_rank:04d}.g3"
if self.gzip:
fname += ".gz"
save_pipe.Add(
c3g.G3MultiFileWriter,
filename=os.path.join(ob_dir, "frames-%05u.g3"),
size_limit=int(self.framefile_mb * 1024**2),
c3g.G3Writer,
filename=os.path.join(ob_dir, fname),
)
save_pipe.Run()

del save_pipe
del emitter
else:
# Gather frames to one process and write
if ob.comm.group_rank == ex_rank:
emitter = frame_emitter(frames=frames)
save_pipe = c3g.G3Pipeline()
save_pipe.Add(emitter)
fpattern = "frames-%04u.g3"
if self.gzip:
fpattern += ".gz"
save_pipe.Add(
c3g.G3MultiFileWriter,
filename=os.path.join(ob_dir, fpattern),
size_limit=int(self.framefile_mb * 1024**2),
)
save_pipe.Run()
del save_pipe
del emitter

if ob.comm.comm_group is not None:
ob.comm.comm_group.barrier()
Expand Down
50 changes: 45 additions & 5 deletions src/toast/spt3g/spt3g_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ def export_detdata(
(tuple): The resulting G3 object, and compression parameters if enabled.
"""
do_compression = False
if isinstance(compress, bool) and compress:
do_compression = True
if isinstance(compress, dict):
do_compression = True
if name not in obs.detdata:
raise KeyError(f"DetectorData object '{name}' does not exist in observation")
if g3t == c3g.G3TimestreamMap and times is None:
Expand Down Expand Up @@ -132,7 +137,7 @@ def export_detdata(
else:
tstart = to_g3_time(obs.view[view_name].shared[times][view_index][0])
tstop = to_g3_time(obs.view[view_name].shared[times][view_index][-1])
if compress is not None:
if do_compression:
compression = dict()

dview = obs.detdata[name]
Expand All @@ -144,7 +149,7 @@ def export_detdata(
out[d] = c3g.G3Timestream(scale * dview[d], gunit)
out[d].start = tstart
out[d].stop = tstop
if compress is not None:
if do_compression:
out[d], comp_gain, comp_offset = compress_timestream(out[d], compress)
compression[d] = dict()
compression[d]["gain"] = comp_gain
Expand Down Expand Up @@ -254,7 +259,8 @@ def __call__(self, obs):
ob["site_weather_time"] = to_g3_time(site.weather.time.timestamp())
m_export = set()
for m_in, m_out in self._meta_arrays:
ob[m_out] = to_g3_array_type(obs[m_in])
out_type = to_g3_array_type(obs[m_in].dtype)
ob[m_out] = out_type(obs[m_in])
m_export.add(m_in)
for m_key, m_val in obs.items():
if m_key in m_export:
Expand Down Expand Up @@ -354,6 +360,10 @@ def __init__(
self._interval_names = interval_names
self._compress = compress

@property
def frame_intervals(self):
return self._frame_intervals

@function_timer
def __call__(self, obs):
log = Logger.get()
Expand Down Expand Up @@ -510,8 +520,38 @@ def __call__(self, obs):
(list): List of local frames.
"""
# Ensure data is distributed by time
obs.redistribute(1, times=self._timestamps)
# Ensure data is distributed by time. If the observation
# exporter defines existing frames to use, override the sample
# sets to match those.
redist_sampsets = False
if self._data_export.frame_intervals is not None:
# Create sample sets that match these frame boundaries
if obs.comm_col_rank == 0:
# First row of process grid gets local chunks
local_sets = list()
offset = 0
for intr in obs.intervals[self._data_export.frame_intervals]:
chunk = intr.last - offset + 1
local_sets.append([chunk,])
offset += chunk
if offset != obs.n_local_samples:
local_sets.append([obs.n_local_samples - offset])
# Gather across the row
all_sets = [local_sets,]
if obs.comm_row is not None:
all_sets = obs.comm_row.gather(local_sets, root=0)
if obs.comm_row_rank == 0:
redist_sampsets = list()
for pset in all_sets:
redist_sampsets.extend(pset)
if obs.comm.comm_group is not None:
redist_sampsets = obs.comm.comm_group.bcast(redist_sampsets, root=0)

obs.redistribute(
1,
times=self._timestamps,
override_sample_sets=redist_sampsets,
)

# Rank within the observation (group) communicator
obs_rank = obs.comm.group_rank
Expand Down
6 changes: 1 addition & 5 deletions src/toast/spt3g/spt3g_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ def compress_timestream(ts, params, rmstarget=2**10, rmsmode="white"):
Args:
ts (G3Timestream) : Input signal
params (bool or dict) : if True, compress with default
parameters. If dict with 'rmstarget' member, override
default `rmstarget`. If dict with `gain` and `offset`
members, use those instead.
params (None, bool or dict) : If None, False or an empty dict,
no compression or casting to integers. If True or
non-empty dictionary, enable compression. Expected fields
Expand All @@ -227,7 +223,7 @@ def compress_timestream(ts, params, rmstarget=2**10, rmsmode="white"):
offset (float) : The removed offset
"""
if not params:
if params is None or not params:
return ts, 1, 0
gain = None
offset = None
Expand Down

0 comments on commit 0303cdd

Please sign in to comment.