Skip to content

Commit

Permalink
Update parallel compression feature to support multi-dataset I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
jhendersonHDF committed Sep 25, 2023
1 parent 44a00ef commit 0be2d39
Show file tree
Hide file tree
Showing 12 changed files with 6,135 additions and 4,334 deletions.
25 changes: 23 additions & 2 deletions doc/parallel-compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ H5Pset_dxpl_mpio(dxpl_id, H5FD_MPIO_COLLECTIVE);
H5Dwrite(..., dxpl_id, ...);
```

The following are two simple examples of using the parallel compression
feature:
The following are two simple examples of using the parallel
compression feature:

[ph5_filtered_writes.c](https://github.com/HDFGroup/hdf5/blob/develop/examples/ph5_filtered_writes.c)

Expand All @@ -76,6 +76,27 @@ Remember that the feature requires these writes to use collective
I/O, so the MPI ranks which have nothing to contribute must still
participate in the collective write call.

## Multi-dataset I/O support

The parallel compression feature is supported when using the
multi-dataset I/O API routines ([H5Dwrite_multi](https://hdfgroup.github.io/hdf5/group___h5_d.html#gaf6213bf3a876c1741810037ff2bb85d8)/[H5Dread_multi](https://hdfgroup.github.io/hdf5/group___h5_d.html#ga8eb1c838aff79a17de385d0707709915)), but the
following should be kept in mind:

- Parallel writes to filtered datasets **must** still be collective,
even when using the multi-dataset I/O API routines

- When the multi-dataset I/O API routines are passed a mixture of
filtered and unfiltered datasets, the library currently has to
perform I/O on them separately in two phases. Since there is
some slight complexity involved in this, it may be best (depending
on the number of datasets, number of selected chunks, number of
filtered vs. unfiltered datasets, etc.) to make two individual
multi-dataset I/O calls, one for the filtered datasets and one
for the unfiltered datasets. When performing writes to the datasets,
this would also allow independent write access to the unfiltered
datasets if desired, while still performing collective writes to
the filtered datasets.

## Incremental file space allocation support

HDF5's [file space allocation time](https://portal.hdfgroup.org/display/HDF5/H5P_SET_ALLOC_TIME)
Expand Down
11 changes: 10 additions & 1 deletion release_docs/RELEASE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,16 @@ New Features

Parallel Library:
-----------------
-
- Added optimized support for the parallel compression feature when
using the multi-dataset I/O API routines collectively

Previously, calling H5Dwrite_multi/H5Dread_multi collectively in parallel
with a list containing one or more filtered datasets would cause HDF5 to
break out of the optimized multi-dataset I/O mode and instead perform I/O
by looping over each dataset in the I/O request. The library has now been
updated to perform I/O in a more optimized manner in this case by first
performing I/O on all the filtered datasets at once and then performing
I/O on all the unfiltered datasets at once.


Fortran Library:
Expand Down
26 changes: 26 additions & 0 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,9 @@ H5D__create_piece_map_single(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
piece_info->in_place_tconv = false;
piece_info->buf_off = 0;

/* Check if chunk is in a dataset with filters applied */
piece_info->filtered_dset = di->dset->shared->dcpl_cache.pline.nused > 0;

/* make connection to related dset info from this piece_info */
piece_info->dset_info = di;

Expand Down Expand Up @@ -1591,6 +1594,7 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
hsize_t curr_partial_clip[H5S_MAX_RANK]; /* Current partial dimension sizes to clip against */
hsize_t partial_dim_size[H5S_MAX_RANK]; /* Size of a partial dimension */
bool is_partial_dim[H5S_MAX_RANK]; /* Whether a dimension is currently a partial chunk */
bool filtered_dataset; /* Whether the dataset in question has filters applied */
unsigned num_partial_dims; /* Current number of partial dimensions */
unsigned u; /* Local index variable */
herr_t ret_value = SUCCEED; /* Return value */
Expand Down Expand Up @@ -1640,6 +1644,9 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
/* Set the index of this chunk */
chunk_index = 0;

/* Check whether dataset has filters applied */
filtered_dataset = di->dset->shared->dcpl_cache.pline.nused > 0;

/* Create "temporary" chunk for selection operations (copy file space) */
if (NULL == (tmp_fchunk = H5S_create_simple(fm->f_ndims, fm->chunk_dim, NULL)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTCREATE, FAIL, "unable to create dataspace for chunk");
Expand Down Expand Up @@ -1686,6 +1693,8 @@ H5D__create_piece_file_map_all(H5D_dset_io_info_t *di, H5D_io_info_t *io_info)
new_piece_info->in_place_tconv = false;
new_piece_info->buf_off = 0;

new_piece_info->filtered_dset = filtered_dataset;

/* Insert the new chunk into the skip list */
if (H5SL_insert(fm->dset_sel_pieces, new_piece_info, &new_piece_info->index) < 0) {
H5D__free_piece_info(new_piece_info, NULL, NULL);
Expand Down Expand Up @@ -1798,6 +1807,7 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in
hsize_t chunk_index; /* Index of chunk */
hsize_t start_scaled[H5S_MAX_RANK]; /* Starting scaled coordinates of selection */
hsize_t scaled[H5S_MAX_RANK]; /* Scaled coordinates for this chunk */
bool filtered_dataset; /* Whether the dataset in question has filters applied */
int curr_dim; /* Current dimension to increment */
unsigned u; /* Local index variable */
herr_t ret_value = SUCCEED; /* Return value */
Expand Down Expand Up @@ -1831,6 +1841,9 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in
/* Calculate the index of this chunk */
chunk_index = H5VM_array_offset_pre(fm->f_ndims, dinfo->layout->u.chunk.down_chunks, scaled);

/* Check whether dataset has filters applied */
filtered_dataset = dinfo->dset->shared->dcpl_cache.pline.nused > 0;

/* Iterate through each chunk in the dataset */
while (sel_points) {
/* Check for intersection of current chunk and file selection */
Expand Down Expand Up @@ -1885,6 +1898,8 @@ H5D__create_piece_file_map_hyper(H5D_dset_io_info_t *dinfo, H5D_io_info_t *io_in
new_piece_info->in_place_tconv = false;
new_piece_info->buf_off = 0;

new_piece_info->filtered_dset = filtered_dataset;

/* Add piece to global piece_count */
io_info->piece_count++;

Expand Down Expand Up @@ -2257,6 +2272,8 @@ H5D__piece_file_cb(void H5_ATTR_UNUSED *elem, const H5T_t H5_ATTR_UNUSED *type,
piece_info->in_place_tconv = false;
piece_info->buf_off = 0;

piece_info->filtered_dset = dinfo->dset->shared->dcpl_cache.pline.nused > 0;

/* Make connection to related dset info from this piece_info */
piece_info->dset_info = dinfo;

Expand Down Expand Up @@ -2417,6 +2434,9 @@ H5D__chunk_mdio_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo)

/* Add to sel_pieces and update pieces_added */
io_info->sel_pieces[io_info->pieces_added++] = piece_info;

if (piece_info->filtered_dset)
io_info->filtered_pieces_added++;
}

/* Advance to next skip list node */
Expand Down Expand Up @@ -2728,6 +2748,9 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)
if (io_info->sel_pieces)
io_info->sel_pieces[io_info->pieces_added] = chunk_info;
io_info->pieces_added++;

if (io_info->sel_pieces && chunk_info->filtered_dset)
io_info->filtered_pieces_added++;
}
} /* end if */
else if (!skip_missing_chunks) {
Expand Down Expand Up @@ -3142,6 +3165,9 @@ H5D__chunk_write(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)
if (io_info->sel_pieces)
io_info->sel_pieces[io_info->pieces_added] = chunk_info;
io_info->pieces_added++;

if (io_info->sel_pieces && chunk_info->filtered_dset)
io_info->filtered_pieces_added++;
}
} /* end else */

Expand Down
2 changes: 2 additions & 0 deletions src/H5Dcontig.c
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ H5D__contig_io_init(H5D_io_info_t *io_info, H5D_dset_io_info_t *dinfo)
new_piece_info->in_place_tconv = false;
new_piece_info->buf_off = 0;

new_piece_info->filtered_dset = dinfo->dset->shared->dcpl_cache.pline.nused > 0;

/* Calculate type conversion buffer size and check for in-place conversion if necessary. Currently
* only implemented for selection I/O. */
if (io_info->use_select_io != H5D_SELECTION_IO_MODE_OFF &&
Expand Down
52 changes: 39 additions & 13 deletions src/H5Dio.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)

FUNC_ENTER_NOAPI(FAIL)

#ifdef H5_HAVE_PARALLEL
/* Reset the actual io mode properties to the default values in case
* the DXPL (if it's non-default) was previously used in a collective
* I/O operation.
*/
if (!H5CX_is_def_dxpl()) {
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
} /* end if */
#endif

/* Init io_info */
if (H5D__ioinfo_init(count, H5D_IO_OP_READ, dset_info, &io_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
Expand Down Expand Up @@ -222,6 +233,10 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
dset_info[i].buf.vp = (void *)(((uint8_t *)dset_info[i].buf.vp) + buf_adj);
} /* end if */

/* Check if any filters are applied to the dataset */
if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0)
io_info.filtered_count++;

/* If space hasn't been allocated and not using external storage,
* return fill value to buffer if fill time is upon allocation, or
* do nothing if fill time is never. If the dataset is compact and
Expand Down Expand Up @@ -323,7 +338,11 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
}

/* MDIO-specific second phase initialization */
for (i = 0; i < count; i++)
for (i = 0; i < count; i++) {
/* Check for skipped I/O */
if (dset_info[i].skip_io)
continue;

if (dset_info[i].layout_ops.mdio_init) {
haddr_t prev_tag = HADDR_UNDEF;

Expand All @@ -337,6 +356,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
/* Reset metadata tagging */
H5AC_tag(prev_tag, NULL);
}
}

/* Invoke correct "high level" I/O routine */
if ((*io_info.md_io_ops.multi_read_md)(&io_info) < 0)
Expand Down Expand Up @@ -429,7 +449,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)

done:
/* Shut down the I/O op information */
for (i = 0; i < io_op_init; i++)
for (i = 0; i < count; i++)
if (!dset_info[i].skip_io && dset_info[i].layout_ops.io_term &&
(*dset_info[i].layout_ops.io_term)(&io_info, &(dset_info[i])) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTCLOSEOBJ, FAIL, "unable to shut down I/O op info");
Expand Down Expand Up @@ -512,6 +532,17 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)

FUNC_ENTER_NOAPI(FAIL)

#ifdef H5_HAVE_PARALLEL
/* Reset the actual io mode properties to the default values in case
* the DXPL (if it's non-default) was previously used in a collective
* I/O operation.
*/
if (!H5CX_is_def_dxpl()) {
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
} /* end if */
#endif

/* Init io_info */
if (H5D__ioinfo_init(count, H5D_IO_OP_WRITE, dset_info, &io_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
Expand Down Expand Up @@ -586,7 +617,7 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
if (NULL == dset_info[i].buf.cvp) {
/* Check for any elements selected (which is invalid) */
if (dset_info[i].nelmts > 0)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no output buffer");
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no input buffer");

/* If the buffer is nil, and 0 element is selected, make a fake buffer.
* This is for some MPI package like ChaMPIon on NCSA's tungsten which
Expand Down Expand Up @@ -655,6 +686,10 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to set up I/O operation");

/* Check if any filters are applied to the dataset */
if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0)
io_info.filtered_count++;

/* Allocate dataspace and initialize it if it hasn't been. */
should_alloc_space = dset_info[i].dset->shared->dcpl_cache.efl.nused == 0 &&
!(*dset_info[i].dset->shared->layout.ops->is_space_alloc)(
Expand Down Expand Up @@ -840,7 +875,7 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)

done:
/* Shut down the I/O op information */
for (i = 0; i < io_op_init; i++) {
for (i = 0; i < count; i++) {
assert(!dset_info[i].skip_io);
if (dset_info[i].layout_ops.io_term &&
(*dset_info[i].layout_ops.io_term)(&io_info, &(dset_info[i])) < 0)
Expand Down Expand Up @@ -1225,15 +1260,6 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info)
dset0 = io_info->dsets_info[0].dset;
assert(dset0->oloc.file);

/* Reset the actual io mode properties to the default values in case
* the DXPL (if it's non-default) was previously used in a collective
* I/O operation.
*/
if (!H5CX_is_def_dxpl()) {
H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
} /* end if */

/* Make any parallel I/O adjustments */
if (io_info->using_mpi_vfd) {
H5FD_mpio_xfer_t xfer_mode; /* Parallel transfer for this request */
Expand Down
Loading

0 comments on commit 0be2d39

Please sign in to comment.