Skip to content

Commit

Permalink
Merge pull request #64 from stackhpc/byte-order
Browse files Browse the repository at this point in the history
Add support for byte order
  • Loading branch information
markgoddard authored Aug 2, 2023
2 parents db30c4a + afd80a4 commit 8e4be4a
Show file tree
Hide file tree
Showing 14 changed files with 369 additions and 153 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "reductionist"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
# Due to AWS SDK.
rust-version = "1.66.1"
Expand All @@ -26,7 +26,7 @@ maligned = "0.2.1"
mime = "0.3"
ndarray = "0.15"
ndarray-stats = "0.5"
num-traits = "0.2.15"
num-traits = "0.2.16"
prometheus = { version = "0.13", features = ["process"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "*"
Expand All @@ -47,6 +47,10 @@ criterion = { version = "0.4", features = ["html_reports"] }
regex = "1"
serde_test = "1.0"

[[bench]]
name = "byte_order"
harness = false

[[bench]]
name = "shuffle"
harness = false
Expand Down
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ with a JSON payload of the form:
// - required
"dtype": "int32|int64|uint32|uint64|float32|float64",
// The byte order (endianness) of the data
// - optional, defaults to native byte order of Reductionist server
"byte_order": "big|little",
// The offset in bytes to use when reading data
// - optional, defaults to zero
"offset": 0,
Expand Down Expand Up @@ -99,7 +103,12 @@ with a JSON payload of the form:

The currently supported reducers are `max`, `min`, `sum`, `select` and `count`. All reducers return the result using the same datatype as specified in the request except for `count` which always returns the result as `int64`.

The proxy adds two custom headers `x-activestorage-dtype` and `x-activestrorage-shape` to the HTTP response to allow the numeric result to be reconstructed from the binary content of the response. An additional `x-activestorage-count` header is also returned which contains the number of array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.
The proxy returns the following headers to the HTTP response:

* `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`.
* `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`.
* `x-activestrorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result.
* `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.

[//]: <> (TODO: No OpenAPI support yet).
[//]: <> (For a running instance of the proxy server, the full OpenAPI specification is browsable as a web page at the `{proxy-address}/redoc/` endpoint or in raw JSON form at `{proxy-address}/openapi.json`.)
Expand Down
45 changes: 45 additions & 0 deletions benches/byte_order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/// Benchmarks for the byte order reversal implementation.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use reductionist::array::{build_array_mut_from_shape, get_shape, reverse_array_byte_order};
use reductionist::models::{DType, RequestData, Slice};
use url::Url;

fn get_test_request_data() -> RequestData {
RequestData {
source: Url::parse("http://example.com").unwrap(),
bucket: "bar".to_string(),
object: "baz".to_string(),
dtype: DType::Int32,
byte_order: None,
offset: None,
size: None,
shape: None,
order: None,
selection: None,
compression: None,
filters: None,
missing: None,
}
}

fn criterion_benchmark(c: &mut Criterion) {
for size_k in [64, 256, 1024] {
let size: isize = size_k * 1024;
let mut data: Vec<u32> = (0_u32..(size as u32)).collect::<Vec<u32>>();
let mut request_data = get_test_request_data();
request_data.dtype = DType::Uint32;
let shape = get_shape(data.len(), &request_data);
let mut array = build_array_mut_from_shape(shape, &mut data).unwrap();
for selection in [None, Some(vec![Slice::new(size / 4, size / 2, 2)])] {
let name = format!("byte_order({}, {:?})", size, selection);
c.bench_function(&name, |b| {
b.iter(|| {
reverse_array_byte_order(black_box(&mut array), &selection);
})
});
}
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
3 changes: 3 additions & 0 deletions scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def get_args() -> argparse.Namespace:
parser.add_argument("--bucket", required=True, type=str)
parser.add_argument("--object", required=True, type=str)
parser.add_argument("--dtype", required=True, type=str) #, choices=DTYPES) allow invalid
parser.add_argument("--byte-order", type=str, choices=["big", "little"])
parser.add_argument("--offset", type=int)
parser.add_argument("--size", type=int)
parser.add_argument("--shape", type=str)
Expand Down Expand Up @@ -66,6 +67,8 @@ def build_request_data(args: argparse.Namespace) -> dict:
'order': args.order,
'compression': args.compression,
}
if args.byte_order:
request_data["byte_order"] = args.byte_order
if args.shape:
request_data["shape"] = json.loads(args.shape)
if args.selection:
Expand Down
23 changes: 21 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::models;
use crate::operation;
use crate::operations;
use crate::s3_client;
use crate::types::{ByteOrder, NATIVE_BYTE_ORDER};
use crate::validated_json::ValidatedJson;

use axum::middleware;
Expand Down Expand Up @@ -34,6 +35,13 @@ static HEADER_DTYPE: header::HeaderName = header::HeaderName::from_static("x-act
static HEADER_SHAPE: header::HeaderName = header::HeaderName::from_static("x-activestorage-shape");
/// `x-activestorage-count` header definition
static HEADER_COUNT: header::HeaderName = header::HeaderName::from_static("x-activestorage-count");
/// `x-activestorage-byte-order` header definition
static HEADER_BYTE_ORDER: header::HeaderName =
header::HeaderName::from_static("x-activestorage-byte-order");
const HEADER_BYTE_ORDER_VALUE: &str = match NATIVE_BYTE_ORDER {
ByteOrder::Big => "big",
ByteOrder::Little => "little",
};

impl IntoResponse for models::Response {
/// Convert a [crate::models::Response] into a [axum::response::Response].
Expand All @@ -47,6 +55,7 @@ impl IntoResponse for models::Response {
(&HEADER_DTYPE, self.dtype.to_string().to_lowercase()),
(&HEADER_SHAPE, serde_json::to_string(&self.shape).unwrap()),
(&HEADER_COUNT, serde_json::to_string(&self.count).unwrap()),
(&HEADER_BYTE_ORDER, HEADER_BYTE_ORDER_VALUE.to_string()),
],
self.body,
)
Expand Down Expand Up @@ -159,12 +168,22 @@ async fn operation_handler<T: operation::Operation>(
ValidatedJson(request_data): ValidatedJson<models::RequestData>,
) -> Result<models::Response, ActiveStorageError> {
let data = download_object(&auth, &request_data).await?;
let data = filter_pipeline::filter_pipeline(&request_data, &data)?;
let ptr = data.as_ptr();
let data = filter_pipeline::filter_pipeline(&request_data, data)?;
if request_data.compression.is_some() || request_data.size.is_none() {
// Validate the raw uncompressed data size now that we know it.
models::validate_raw_size(data.len(), request_data.dtype, &request_data.shape)?;
}
T::execute(&request_data, &data)
if request_data.compression.is_none() && request_data.filters.is_none() {
// Assert that we're using zero-copy.
assert_eq!(ptr, data.as_ptr());
}
// Convert to a mutable vector to allow in-place byte order conversion.
let ptr = data.as_ptr();
let vec: Vec<u8> = data.into();
// Assert that we're using zero-copy.
assert_eq!(ptr, vec.as_ptr());
T::execute(&request_data, vec)
}

/// Handler for unknown operations
Expand Down
Loading

0 comments on commit 8e4be4a

Please sign in to comment.