Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parallel compression feature to support multi-dataset I/O #3591

Merged
merged 11 commits into from
Oct 10, 2023
25 changes: 23 additions & 2 deletions doc/parallel-compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ H5Pset_dxpl_mpio(dxpl_id, H5FD_MPIO_COLLECTIVE);
H5Dwrite(..., dxpl_id, ...);
```

The following are two simple examples of using the parallel compression
feature:
The following are two simple examples of using the parallel
compression feature:

[ph5_filtered_writes.c](https://github.com/HDFGroup/hdf5/blob/develop/examples/ph5_filtered_writes.c)

Expand All @@ -76,6 +76,27 @@ Remember that the feature requires these writes to use collective
I/O, so the MPI ranks which have nothing to contribute must still
participate in the collective write call.

## Multi-dataset I/O support

The parallel compression feature is supported when using the
multi-dataset I/O API routines ([H5Dwrite_multi](https://hdfgroup.github.io/hdf5/group___h5_d.html#gaf6213bf3a876c1741810037ff2bb85d8)/[H5Dread_multi](https://hdfgroup.github.io/hdf5/group___h5_d.html#ga8eb1c838aff79a17de385d0707709915)), but the
following should be kept in mind:

- Parallel writes to filtered datasets **must** still be collective,
even when using the multi-dataset I/O API routines

- When the multi-dataset I/O API routines are passed a mixture of
filtered and unfiltered datasets, the library currently has to
perform I/O on them separately in two phases. Since there is
some slight complexity involved in this, it may be best (depending
on the number of datasets, number of selected chunks, number of
filtered vs. unfiltered datasets, etc.) to make two individual
multi-dataset I/O calls, one for the filtered datasets and one
for the unfiltered datasets. When performing writes to the datasets,
this would also allow independent write access to the unfiltered
datasets if desired, while still performing collective writes to
the filtered datasets.

## Incremental file space allocation support

HDF5's [file space allocation time](https://portal.hdfgroup.org/display/HDF5/H5P_SET_ALLOC_TIME)
jhendersonHDF marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
11 changes: 10 additions & 1 deletion release_docs/RELEASE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,16 @@ New Features

Parallel Library:
-----------------
-
- Added optimized support for the parallel compression feature when
using the multi-dataset I/O API routines collectively

Previously, calling H5Dwrite_multi/H5Dread_multi collectively in parallel
with a list containing one or more filtered datasets would cause HDF5 to
break out of the optimized multi-dataset I/O mode and instead perform I/O
by looping over each dataset in the I/O request. The library has now been
updated to perform I/O in a more optimized manner in this case by first
performing I/O on all the filtered datasets at once and then performing
I/O on all the unfiltered datasets at once.


Fortran Library:
Expand Down
26 changes: 26 additions & 0 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,9 @@ H5D__create_piece_map_single(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
piece_info->in_place_tconv = false;
piece_info->buf_off = 0;

/* Check if chunk is in a dataset with filters applied */
piece_info->filtered_dset = di->dset->shared->dcpl_cache.pline.nused > 0;

/* make connection to related dset info from this piece_info */
piece_info->dset_info = di;

Expand Down Expand Up @@ -1591,6 +1594,7 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
hsize_t curr_partial_clip[H5S_MAX_RANK]; /* Current partial dimension sizes to clip against */
hsize_t partial_dim_size[H5S_MAX_RANK]; /* Size of a partial dimension */
bool is_partial_dim[H5S_MAX_RANK]; /* Whether a dimension is currently a partial chunk */
bool filtered_dataset; /* Whether the dataset in question has filters applied */
unsigned num_partial_dims; /* Current number of partial dimensions */
unsigned u; /* Local index variable */
herr_t ret_value = SUCCEED; /* Return value */
Expand Down Expand Up @@ -1640,6 +1644,9 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
/* Set the index of this chunk */
chunk_index = 0;

/* Check whether dataset has filters applied */
filtered_dataset = di->dset->shared->dcpl_cache.pline.nused > 0;

/* Create "temporary" chunk for selection operations (copy file space) */
if (NULL == (tmp_fchunk = H5S_create_simple(fm->f_ndims, fm->chunk_dim, NULL)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTCREATE, FAIL, "unable to create dataspace for chunk");
Expand Down Expand Up @@ -1686,6 +1693,8 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
new_piece_info->in_place_tconv = false;
new_piece_info->buf_off = 0;

new_piece_info->filtered_dset = filtered_dataset;

/* Insert the new chunk into the skip list */
if (H5SL_insert(fm->dset_sel_pieces, new_piece_info, &new_piece_info->index) < 0) {
H5D__free_piece_info(new_piece_info, NULL, NULL);
Expand Down Expand Up @@ -1798,6 +1807,7 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in
hsize_t chunk_index; /* Index of chunk */
hsize_t start_scaled[H5S_MAX_RANK]; /* Starting scaled coordinates of selection */
hsize_t scaled[H5S_MAX_RANK]; /* Scaled coordinates for this chunk */
bool filtered_dataset; /* Whether the dataset in question has filters applied */
int curr_dim; /* Current dimension to increment */
unsigned u; /* Local index variable */
herr_t ret_value = SUCCEED; /* Return value */
Expand Down Expand Up @@ -1831,6 +1841,9 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in
/* Calculate the index of this chunk */
chunk_index = H5VM_array_offset_pre(fm->f_ndims, dinfo->layout->u.chunk.down_chunks, scaled);

/* Check whether dataset has filters applied */
filtered_dataset = dinfo->dset->shared->dcpl_cache.pline.nused > 0;

/* Iterate through each chunk in the dataset */
while (sel_points) {
/* Check for intersection of current chunk and file selection */
Expand Down Expand Up @@ -1885,6 +1898,8 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in
new_piece_info->in_place_tconv = false;
new_piece_info->buf_off = 0;

new_piece_info->filtered_dset = filtered_dataset;

/* Add piece to global piece_count */
io_info->piece_count++;

Expand Down Expand Up @@ -2257,6 +2272,8 @@ H5D__piece_file_cb(void H5_ATTR_UNUSED *elem, const H5T_t H5_ATTR_UNUSED *type,
piece_info->in_place_tconv = false;
piece_info->buf_off = 0;

piece_info->filtered_dset = dinfo->dset->shared->dcpl_cache.pline.nused > 0;

/* Make connection to related dset info from this piece_info */
piece_info->dset_info = dinfo;

Expand Down Expand Up @@ -2417,6 +2434,9 @@ H5D__chunk_mdio_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo)

/* Add to sel_pieces and update pieces_added */
io_info->sel_pieces[io_info->pieces_added++] = piece_info;

if (piece_info->filtered_dset)
io_info->filtered_pieces_added++;
}

/* Advance to next skip list node */
Expand Down Expand Up @@ -2728,6 +2748,9 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)
if (io_info->sel_pieces)
io_info->sel_pieces[io_info->pieces_added] = chunk_info;
io_info->pieces_added++;

if (io_info->sel_pieces && chunk_info->filtered_dset)
io_info->filtered_pieces_added++;
}
} /* end if */
else if (!skip_missing_chunks) {
Expand Down Expand Up @@ -3142,6 +3165,9 @@ H5D__chunk_write(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)
if (io_info->sel_pieces)
io_info->sel_pieces[io_info->pieces_added] = chunk_info;
io_info->pieces_added++;

if (io_info->sel_pieces && chunk_info->filtered_dset)
io_info->filtered_pieces_added++;
}
} /* end else */

Expand Down
2 changes: 2 additions & 0 deletions src/H5Dcontig.c
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ H5D__contig_io_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo)
new_piece_info->in_place_tconv = false;
new_piece_info->buf_off = 0;

new_piece_info->filtered_dset = dinfo->dset->shared->dcpl_cache.pline.nused > 0;

/* Calculate type conversion buffer size and check for in-place conversion if necessary. Currently
* only implemented for selection I/O. */
if (io_info->use_select_io != H5D_SELECTION_IO_MODE_OFF &&
Expand Down
52 changes: 39 additions & 13 deletions src/H5Dio.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)

FUNC_ENTER_NOAPI(FAIL)

#ifdef H5_HAVE_PARALLEL
/* Reset the actual io mode properties to the default values in case
* the DXPL (if it's non-default) was previously used in a collective
* I/O operation.
*/
if (!H5CX_is_def_dxpl()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I needed to move the setting of the optimization method higher up so if something fails in this function the returned value is sane.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this in H5D__ioinfo_init() instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I feel like this may even belong higher up in the library since H5D__read is called in a few other places. It also seems like good compartmentalization to just have H5D__ioinfo_init init the io_info structure without doing much else.

H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
} /* end if */
#endif

/* Init io_info */
if (H5D__ioinfo_init(count, H5D_IO_OP_READ, dset_info, &io_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
Expand Down Expand Up @@ -222,6 +233,10 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
dset_info[i].buf.vp = (void *)(((uint8_t *)dset_info[i].buf.vp) + buf_adj);
} /* end if */

/* Check if any filters are applied to the dataset */
if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0)
io_info.filtered_count++;

/* If space hasn't been allocated and not using external storage,
* return fill value to buffer if fill time is upon allocation, or
* do nothing if fill time is never. If the dataset is compact and
Expand Down Expand Up @@ -323,7 +338,11 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
}

/* MDIO-specific second phase initialization */
for (i = 0; i < count; i++)
for (i = 0; i < count; i++) {
/* Check for skipped I/O */
if (dset_info[i].skip_io)
Copy link
Collaborator Author

@jhendersonHDF jhendersonHDF Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure we don't call the mdio_init callback for datasets where we're skipping I/O, because the callbacks may try to use fields that aren't initialized for the datasets

continue;

if (dset_info[i].layout_ops.mdio_init) {
haddr_t prev_tag = HADDR_UNDEF;

Expand All @@ -337,6 +356,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
/* Reset metadata tagging */
H5AC_tag(prev_tag, NULL);
}
}

/* Invoke correct "high level" I/O routine */
if ((*io_info.md_io_ops.multi_read_md)(&io_info) < 0)
Expand Down Expand Up @@ -429,7 +449,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)

done:
/* Shut down the I/O op information */
for (i = 0; i < io_op_init; i++)
for (i = 0; i < count; i++)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we need to step through the full count number of dataset entries and call their io_term callbacks, instead of just io_op_init number of them. Otherwise, if the first io_op_init entries were skipped datasets, the latter entries won't get terminated and certain things are left around in memory, such as the cached dataset sel_pieces array. Unintended reuse of that cached field when doing an H5Dread_multi followed by H5Dwrite_multi gave me errors like:

HDF5-DIAG: Error detected in HDF5 (1.15.0) MPI-process 0:
  #000: /home/jhenderson/git/hdf5/src/H5D.c line 1446 in H5Dwrite_multi(): can't synchronously write data
    major: Dataset
    minor: Write failed
  #001: /home/jhenderson/git/hdf5/src/H5D.c line 1317 in H5D__write_api_common(): can't write data
    major: Dataset
    minor: Write failed
  #002: /home/jhenderson/git/hdf5/src/H5VLcallback.c line 2282 in H5VL_dataset_write_direct(): dataset write failed
    major: Virtual Object Layer
    minor: Write failed
  #003: /home/jhenderson/git/hdf5/src/H5VLcallback.c line 2237 in H5VL__dataset_write(): dataset write failed
    major: Virtual Object Layer
    minor: Write failed
  #004: /home/jhenderson/git/hdf5/src/H5VLnative_dataset.c line 420 in H5VL__native_dataset_write(): can't write data
    major: Dataset
    minor: Write failed
  #005: /home/jhenderson/git/hdf5/src/H5Dio.c line 732 in H5D__write(): can't initialize I/O info
    major: Dataset
    minor: Unable to initialize object
  #006: /home/jhenderson/git/hdf5/src/H5Dchunk.c line 1091 in H5D__chunk_io_init(): unable to create file and memory chunk selections
    major: Dataset
    minor: Unable to initialize object
  #007: /home/jhenderson/git/hdf5/src/H5Dchunk.c line 1241 in H5D__chunk_io_init_selections(): unable to create file chunk selections
    major: Dataset
    minor: Unable to initialize object
  #008: /home/jhenderson/git/hdf5/src/H5Dchunk.c line 1944 in H5D__create_piece_file_map_hyper(): can't insert piece into skip list
    major: Dataspace
    minor: Unable to insert object
  #009: /home/jhenderson/git/hdf5/src/H5SL.c line 1036 in H5SL_insert(): can't create new skip list node
    major: Skip Lists
    minor: Unable to insert object
  #010: /home/jhenderson/git/hdf5/src/H5SL.c line 709 in H5SL__insert_common(): can't insert duplicate key
    major: Skip Lists
    minor: Unable to insert object

if (!dset_info[i].skip_io && dset_info[i].layout_ops.io_term &&
(*dset_info[i].layout_ops.io_term)(&io_info, &(dset_info[i])) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTCLOSEOBJ, FAIL, "unable to shut down I/O op info");
Expand Down Expand Up @@ -512,6 +532,17 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)

FUNC_ENTER_NOAPI(FAIL)

#ifdef H5_HAVE_PARALLEL
/* Reset the actual io mode properties to the default values in case
* the DXPL (if it's non-default) was previously used in a collective
* I/O operation.
*/
if (!H5CX_is_def_dxpl()) {
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
} /* end if */
#endif

/* Init io_info */
if (H5D__ioinfo_init(count, H5D_IO_OP_WRITE, dset_info, &io_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
Expand Down Expand Up @@ -586,7 +617,7 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
if (NULL == dset_info[i].buf.cvp) {
/* Check for any elements selected (which is invalid) */
if (dset_info[i].nelmts > 0)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no output buffer");
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no input buffer");

/* If the buffer is nil, and 0 element is selected, make a fake buffer.
* This is for some MPI package like ChaMPIon on NCSA's tungsten which
Expand Down Expand Up @@ -655,6 +686,10 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to set up I/O operation");

/* Check if any filters are applied to the dataset */
if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0)
io_info.filtered_count++;

/* Allocate dataspace and initialize it if it hasn't been. */
should_alloc_space = dset_info[i].dset->shared->dcpl_cache.efl.nused == 0 &&
!(*dset_info[i].dset->shared->layout.ops->is_space_alloc)(
Expand Down Expand Up @@ -840,7 +875,7 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)

done:
/* Shut down the I/O op information */
for (i = 0; i < io_op_init; i++) {
for (i = 0; i < count; i++) {
assert(!dset_info[i].skip_io);
if (dset_info[i].layout_ops.io_term &&
(*dset_info[i].layout_ops.io_term)(&io_info, &(dset_info[i])) < 0)
Expand Down Expand Up @@ -1225,15 +1260,6 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info)
dset0 = io_info->dsets_info[0].dset;
assert(dset0->oloc.file);

/* Reset the actual io mode properties to the default values in case
* the DXPL (if it's non-default) was previously used in a collective
* I/O operation.
*/
if (!H5CX_is_def_dxpl()) {
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
} /* end if */

/* Make any parallel I/O adjustments */
if (io_info->using_mpi_vfd) {
H5FD_mpio_xfer_t xfer_mode; /* Parallel transfer for this request */
Expand Down
Loading
Loading