Skip to content

Commit

Permalink
[resotolib] Allow ISO 8601 durations (#1846)
Browse files Browse the repository at this point in the history
* [resotolib] Allow ISO 8601 durations

* fix return type
  • Loading branch information
aquamatthias authored Dec 8, 2023
1 parent 2f894ab commit 26538fb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
26 changes: 22 additions & 4 deletions resotolib/resotolib/durations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from functools import reduce
import re
from itertools import chain
from typing import Union, List, Optional
from typing import Union, List, Optional, cast

import isodate
import parsy
from isodate.isoduration import ISO8601_PERIOD_REGEX
from parsy import string, Parser

from resotolib.parse_util import lexeme, float_p, integer_p
Expand All @@ -26,15 +28,21 @@

time_unit_combines = [",", "and"]

# Check if a string is a valid
DurationRe = re.compile(
# Regexp that matches a duration string
DurationRegexp = re.compile(
"^[+-]?([\\d.]+\\s*("
+ "|".join(chain.from_iterable(names for unit, names, _ in time_units))
+ ")\\s*("
+ "|".join(time_unit_combines)
+ ")?\\s*)+$"
)

# Matches the proprietary format as well as ISO8601 durations
DurationRe = re.compile(f"({DurationRegexp.pattern})|({ISO8601_PERIOD_REGEX.pattern})")

# Simple check to distinguish between ISO8601 and proprietary format
__ISO8601_PERIOD_PREFIX = re.compile("^([+-])?P")


def combine_durations(elems: List[Union[int, float]]) -> Union[int, float]:
result = 0.0
Expand All @@ -53,7 +61,17 @@ def combine_durations(elems: List[Union[int, float]]) -> Union[int, float]:


def parse_duration(ds: str) -> timedelta:
return timedelta(seconds=duration_parser.parse(ds))
if __ISO8601_PERIOD_PREFIX.match(ds):
dr = isodate.parse_duration(ds)
if isinstance(dr, isodate.Duration):
td = dr.tdelta
return timedelta(
days=float(td.days + 31 * dr.months + 365 * dr.years), seconds=td.seconds, microseconds=td.microseconds
)
else:
return cast(timedelta, dr)
else:
return timedelta(seconds=duration_parser.parse(ds))


def parse_optional_duration(ds: str) -> Optional[timedelta]:
Expand Down
41 changes: 40 additions & 1 deletion resotolib/test/durations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Callable, Any, cast, Optional, TypeVar

from hypothesis import given
from hypothesis.strategies import sampled_from, tuples, integers, composite, lists, SearchStrategy, just
from hypothesis.strategies import sampled_from, tuples, integers, composite, lists, SearchStrategy, just, booleans

from resotolib.durations import time_unit_parser, time_units, parse_duration, DurationRe, duration_str

Expand Down Expand Up @@ -48,12 +48,42 @@ def durations_gen(ud: UD) -> str:
return result


@composite
def iso8601_durations_gen(ud: UD) -> str:
d = Drawer(ud)
result = d.draw(sampled_from(["", "+", "-"]))
result += "P"
with_value = False
for dr in ["Y", "M", "W", "D"]:
if d.draw(booleans()):
with_value = True
result += str(d.draw(integers(1, 1000))) + dr
if d.draw(booleans()):
result += "T"
for dr in ["H", "M", "S"]:
if d.draw(booleans()):
with_value = True
result += str(d.draw(integers(1, 1000))) + dr
# safe guard for empty durations
if not with_value:
units = ["H", "M", "S"] if result.endswith("T") else ["Y", "M", "W", "D"]
result += str(d.draw(integers(1, 1000))) + d.draw(sampled_from(units))
return result


@given(durations_gen())
def test_arbitrary_durations(duration_str: str) -> None:
assert DurationRe.fullmatch(duration_str)
parse_duration(duration_str)


@given(iso8601_durations_gen())
def test_iso8601_durations(duration_str: str) -> None:
print(duration_str)
assert DurationRe.fullmatch(duration_str)
parse_duration(duration_str)


def test_parse_duration() -> None:
for short, names, seconds in time_units:
for name in names:
Expand All @@ -67,6 +97,15 @@ def test_parse_duration() -> None:
assert parse_duration("3d4h6m5s") == timedelta(days=3, hours=4, minutes=6, seconds=5)


def test_parse_is08601_duration() -> None:
assert parse_duration("P1Y") == timedelta(days=365)
assert parse_duration("PT1S") == timedelta(seconds=1)
assert parse_duration("P4D") == timedelta(days=4)
assert parse_duration("PT1H") == timedelta(hours=1)
assert parse_duration("P32DT4H3M3S") == timedelta(days=32, hours=4, minutes=3, seconds=3)
assert parse_duration("-P32DT4H3M3S") == timedelta(days=-32, hours=-4, minutes=-3, seconds=-3)


def test_duration_string() -> None:
duration = timedelta(days=1, hours=2, minutes=3, seconds=4)
assert duration_str(duration) == "1d2h3min4s"
Expand Down

0 comments on commit 26538fb

Please sign in to comment.