Skip to content

Commit

Permalink
feat: Use millisecond precision when resampling and aligning timeseri…
Browse files Browse the repository at this point in the history
…es (#205)

Switch to using milliseconds as the main unit for comparison and resampling. Polars is also updated to v0.40.0
  • Loading branch information
Batch21 authored Jun 24, 2024
1 parent 1cf63d6 commit 4b21179
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ thiserror = "1.0.25"
num = "0.4.0"
float-cmp = "0.9.0"
ndarray = "0.15.3"
polars = { version = "0.39", features = ["lazy", "rows", "ndarray"] }
polars = { version = "0.40", features = ["lazy", "rows", "ndarray"] }
pyo3-polars = "0.14"
pyo3 = { version = "0.21", default-features = false }
pyo3-log = "0.10"
Expand Down
2 changes: 2 additions & 0 deletions pywr-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub enum PywrError {
TimestepRangeGenerationError(String),
#[error("Could not create timesteps for frequency '{0}'")]
TimestepGenerationError(String),
#[error("Pywr does not currently support timesteps of varying duration")]
TimestepDurationMismatch,
#[error("aggregation error: {0}")]
Aggregation(#[from] AggregationError),
}
Expand Down
55 changes: 47 additions & 8 deletions pywr-core/src/timestep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use std::ops::Add;
use crate::PywrError;

const SECS_IN_DAY: i64 = 60 * 60 * 24;
const MILLISECS_IN_DAY: i64 = 1000 * SECS_IN_DAY;
const MILLISECS_IN_HOUR: i64 = 1000 * 60 * 60;
const MILLISECS_IN_MINUTE: i64 = 1000 * 60;
const MILLISECS_IN_SECOND: i64 = 1000;

/// A newtype for `chrono::TimeDelta` that provides a couple of useful convenience methods.
#[pyclass]
Expand Down Expand Up @@ -59,9 +63,37 @@ impl PywrDuration {
self.0.num_seconds() as f64 / SECS_IN_DAY as f64
}

/// Returns the number of nanoseconds in the duration.
pub fn whole_nanoseconds(&self) -> Option<i64> {
self.0.num_nanoseconds()
/// Returns the number of milliseconds in the duration.
pub fn milliseconds(&self) -> i64 {
self.0.num_milliseconds()
}

/// Convert the duration to a string representation that can be parsed by polars
/// see: https://docs.rs/polars/latest/polars/prelude/struct.Duration.html#method.parse
pub fn duration_string(&self) -> String {
let milliseconds = self.milliseconds();
let mut duration = String::new();
let days = milliseconds / MILLISECS_IN_DAY;
if days > 0 {
duration.push_str(&format!("{}d", days));
}
let hours = (milliseconds % MILLISECS_IN_DAY) / MILLISECS_IN_HOUR;
if hours > 0 {
duration.push_str(&format!("{}h", hours));
}
let minutes = (milliseconds % MILLISECS_IN_HOUR) / MILLISECS_IN_MINUTE;
if minutes > 0 {
duration.push_str(&format!("{}m", minutes));
}
let seconds = (milliseconds % MILLISECS_IN_MINUTE) / MILLISECS_IN_SECOND;
if seconds > 0 {
duration.push_str(&format!("{}s", seconds));
}
let milliseconds = milliseconds % MILLISECS_IN_SECOND;
if milliseconds > 0 {
duration.push_str(&format!("{}ms", milliseconds));
}
duration
}
}

Expand Down Expand Up @@ -195,15 +227,13 @@ impl Timestepper {
#[derive(Debug)]
pub struct TimeDomain {
timesteps: Vec<Timestep>,
duration: PywrDuration,
}

impl TimeDomain {
/// Return the duration of each time-step.
pub fn step_duration(&self) -> PywrDuration {
// This relies on the assumption that all time-steps are the same length.
// Ideally, this invariant would be refactored to have the duration stored here in `TimeDomain`,
// rather than in `Timestep`.
self.timesteps.first().expect("Not time-steps defined.").duration
self.duration
}

pub fn timesteps(&self) -> &[Timestep] {
Expand Down Expand Up @@ -233,7 +263,11 @@ impl TryFrom<Timestepper> for TimeDomain {

fn try_from(value: Timestepper) -> Result<Self, Self::Error> {
let timesteps = value.timesteps()?;
Ok(Self { timesteps })
let duration = timesteps.first().expect("No time-steps defined.").duration;
match timesteps.iter().all(|t| t.duration == duration) {
true => Ok(Self { timesteps, duration }),
false => Err(PywrError::TimestepDurationMismatch),
}
}
}

Expand Down Expand Up @@ -313,23 +347,28 @@ mod test {
let duration = PywrDuration::days(5);
assert_eq!(duration.whole_days(), Some(5));
assert_eq!(duration.fractional_days(), 5.0);
assert_eq!(duration.duration_string(), String::from("5d"));

let duration: PywrDuration = TimeDelta::hours(12).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), 0.5);
assert_eq!(duration.duration_string(), String::from("12h"));

let duration: PywrDuration = TimeDelta::minutes(30).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), 1.0 / 48.0);
assert_eq!(duration.duration_string(), String::from("30m"));

let duration_secs = SECS_IN_DAY + 1;
let duration: PywrDuration = TimeDelta::seconds(duration_secs).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), duration_secs as f64 / SECS_IN_DAY as f64);
assert_eq!(duration.duration_string(), String::from("1d1s"));

let duration_secs = SECS_IN_DAY - 1;
let duration: PywrDuration = TimeDelta::seconds(duration_secs).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), duration_secs as f64 / SECS_IN_DAY as f64);
assert_eq!(duration.duration_string(), String::from("23h59m59s"));
}
}
28 changes: 15 additions & 13 deletions pywr-schema/src/timeseries/align_and_resample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn align_and_resample(
let df = df
.clone()
.lazy()
.with_columns([col(time_col).cast(DataType::Datetime(TimeUnit::Nanoseconds, None))])
.with_columns([col(time_col).cast(DataType::Datetime(TimeUnit::Milliseconds, None))])
.collect()?
.sort([time_col], sort_options)?;

Expand All @@ -43,13 +43,10 @@ pub fn align_and_resample(
None => return Err(TimeseriesError::TimeseriesDurationNotFound(name.to_string())),
};

let model_duration = domain
.time()
.step_duration()
.whole_nanoseconds()
.ok_or(TimeseriesError::NoDurationNanoSeconds)?;
let model_duration = domain.time().step_duration();
let model_duration_string = model_duration.duration_string();

let df = match model_duration.cmp(&timeseries_duration) {
let df = match model_duration.milliseconds().cmp(&timeseries_duration) {
Ordering::Greater => {
// Downsample
df.clone()
Expand All @@ -58,9 +55,9 @@ pub fn align_and_resample(
col(time_col),
[],
DynamicGroupOptions {
every: Duration::new(model_duration),
period: Duration::new(model_duration),
offset: Duration::new(0),
every: Duration::parse(model_duration_string.as_str()),
period: Duration::parse(model_duration_string.as_str()),
offset: Duration::parse("0d"),
start_by: StartBy::DataPoint,
..Default::default()
},
Expand All @@ -73,7 +70,12 @@ pub fn align_and_resample(
// TODO: this does not extend the dataframe beyond its original end date. Should it do when using a forward fill strategy?
// The df could be extend by the length of the duration it is being resampled to.
df.clone()
.upsample::<[String; 0]>([], "time", Duration::new(model_duration), Duration::new(0))?
.upsample::<[String; 0]>(
[],
"time",
Duration::parse(model_duration_string.as_str()),
Duration::parse("0d"),
)?
.fill_null(FillNullStrategy::Forward(None))?
}
Ordering::Equal => df,
Expand Down Expand Up @@ -155,7 +157,7 @@ mod tests {
NaiveDateTime::parse_from_str("2021-01-14 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(),
],
)
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
.unwrap();
let resampled_dates = df.column("time").unwrap();
assert!(resampled_dates.equals(&expected_dates));
Expand Down Expand Up @@ -247,7 +249,7 @@ mod tests {
assert!(resampled_values.equals(&expected_values));

let expected_dates = Series::new("time", time)
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
.unwrap();

let resampled_dates = df.column("time").unwrap();
Expand Down
2 changes: 0 additions & 2 deletions pywr-schema/src/timeseries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub enum TimeseriesError {
TimeseriesUnparsableFileFormat { provider: String, path: String },
#[error("A scenario group with name '{0}' was not found")]
ScenarioGroupNotFound(String),
#[error("Duration could not be represented as nanoseconds")]
NoDurationNanoSeconds,
#[error("The length of the resampled timeseries dataframe '{0}' does not match the number of model timesteps.")]
DataFrameTimestepMismatch(String),
#[error("A timeseries dataframe with the name '{0}' already exists.")]
Expand Down
15 changes: 10 additions & 5 deletions pywr-schema/src/timeseries/polars_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ mod core {
Some(ext) => {
let ext = ext.to_str().map(|s| s.to_lowercase());
match ext.as_deref() {
Some("csv") => CsvReader::from_path(fp)?
.infer_schema(None)
.with_try_parse_dates(true)
.has_header(true)
.finish()?,
Some("csv") => {
let parse_options = CsvParseOptions::default().with_try_parse_dates(true);

CsvReadOptions::default()
.with_schema(None)
.with_has_header(true)
.with_parse_options(parse_options)
.try_into_reader_with_file_path(Some(fp))?
.finish()?
}
Some("parquet") => {
todo!()
}
Expand Down

0 comments on commit 4b21179

Please sign in to comment.