Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace _Sequence by _FakeSequence #1101

Merged
merged 10 commits into from
Aug 14, 2024
2 changes: 2 additions & 0 deletions docs/whatsnew/v0-5-4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Bug fixes
Other changes
#############

* Refined internal sequence generation. (For e.g. constraint formulations,
Scalars are internally mapped to fake sequences without a defined length.)

Known issues
############
Expand Down
90 changes: 53 additions & 37 deletions src/oemof/solph/_plumbing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@

"""

from collections import UserList
from collections import abc
from itertools import repeat

import numpy as np


def sequence(iterable_or_scalar):
"""Tests if an object is iterable (except string) or scalar and returns
"""Checks if an object is iterable (except string) or scalar and returns
the original sequence if object is an iterable and an 'emulated'
sequence object of class _Sequence if object is a scalar or string.

Expand Down Expand Up @@ -47,64 +46,81 @@ def sequence(iterable_or_scalar):

>>> x[10]
10
>>> print(x)
[10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]

"""
if isinstance(iterable_or_scalar, abc.Iterable) and not isinstance(
iterable_or_scalar, str
):
if isinstance(iterable_or_scalar, str):
return iterable_or_scalar
elif isinstance(iterable_or_scalar, abc.Iterable):
return np.array(iterable_or_scalar)
else:
return _Sequence(default=iterable_or_scalar)
return _FakeSequence(value=iterable_or_scalar)


class _Sequence(UserList):
class _FakeSequence:
"""Emulates a list whose length is not known in advance.

Parameters
----------
source:
default:
value : scalar
length : integer


Examples
--------
>>> s = _Sequence(default=42)
>>> len(s)
1
>>> s[1]
42
>>> s[2]
42
>>> len(s)
3
>>> s = _FakeSequence(value=42, length=5)
>>> s
[42, 42, 42]
>>> s[8]
[42, 42, 42, 42, 42]
>>> s = _FakeSequence(value=42)
>>> # undefined lenght, access still works
>>> s[1337]
42


"""

def __init__(self, *args, **kwargs):
self.default = kwargs["default"]
self.default_changed = False
self.highest_index = 0
super().__init__(*args)
def __init__(self, value, length=None):
self._value = value
self._length = length

@property
def size(self):
return self._length

def __getitem__(self, key):
self.highest_index = max(self.highest_index, key)
return self.default
@size.setter
def size(self, value):
self._length = value

def __init_list(self):
self.data = [self.default] * (self.highest_index + 1)
def __getitem__(self, _):
return self._value

def __repr__(self):
return str([i for i in self])
if self._length is not None:
return str([i for i in self])
else:
return f"[{self._value}, {self._value}, ..., {self._value}]"

def __len__(self):
return max(len(self.data), self.highest_index + 1)
return self._length

def __iter__(self):
return repeat(self.default, self.highest_index + 1)
return repeat(self._value, self._length)

def max(self):
return self._value

def min(self):
return self._value

def sum(self):
if self._length is None:
return np.inf
else:
return self._length * self._value

def to_numpy(self, length=None):
if length is not None:
return np.full(length, self._value)
else:
return np.full(len(self), self._value)

@property
def value(self):
return self._value
6 changes: 3 additions & 3 deletions src/oemof/solph/components/_generic_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ def _check_invest_attributes(self):
raise AttributeError(e2)
if (
self.investment
and sum(solph_sequence(self.fixed_losses_absolute)) != 0
and self.fixed_losses_absolute.max() != 0
and self.investment.existing == 0
and self.investment.minimum[0] == 0
and self.investment.minimum.min() == 0
):
e3 = (
"With fixed_losses_absolute > 0, either investment.existing "
Expand Down Expand Up @@ -1127,7 +1127,7 @@ def _create(self, group=None):
"For a multi-period investment model, fixed absolute"
" losses are not supported. Please remove parameter."
)
if n.fixed_losses_absolute.default != 0:
if n.fixed_losses_absolute[0] != 0:
raise ValueError(error_fixed_absolute_losses)
error_initial_storage_level = (
"For a multi-period model, initial_storage_level is"
Expand Down
15 changes: 6 additions & 9 deletions src/oemof/solph/components/_offset_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
SPDX-License-Identifier: MIT

"""

from warnings import warn

from oemof.network import Node
Expand Down Expand Up @@ -280,16 +279,14 @@ def normed_offset_and_conversion_factors_from_coefficients(
input_bus = list(self.inputs.values())[0].input
for flow in self.outputs.values():

max_len = max(
len(flow.max),
len(flow.min),
len(coefficients[0]),
len(coefficients[1]),
)
if flow.max.size is not None:
target_len = flow.max.size
else:
target_len = 1

slope = []
offset = []
for i in range(max_len):
for i in range(target_len):
eta_at_max = (
flow.max[i]
* coefficients[1][i]
Expand All @@ -307,7 +304,7 @@ def normed_offset_and_conversion_factors_from_coefficients(
slope.append(c0)
offset.append(c1)

if max_len == 1:
if target_len == 1:
slope = slope[0]
offset = offset[0]

Expand Down
2 changes: 1 addition & 1 deletion src/oemof/solph/flows/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def __init__(
if (
self.investment
and self.nonconvex
and not np.isfinite(self.investment.maximum)
and not np.isfinite(self.investment.maximum.max())
):
raise AttributeError(
"Investment into a non-convex flows needs a maximum "
Expand Down
6 changes: 1 addition & 5 deletions src/oemof/solph/flows/_investment_flow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ def _create_sets(self, group):
)

self.MIN_INVESTFLOWS = Set(
initialize=[
(g[0], g[1])
for g in group
if (g[2].min[0] != 0 or len(g[2].min) > 1)
]
initialize=[(g[0], g[1]) for g in group if g[2].min.min() != 0]
)

self.EXISTING_INVESTFLOWS = Set(
Expand Down
4 changes: 2 additions & 2 deletions src/oemof/solph/flows/_non_convex_flow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ def _sets_for_non_convex_flows(self, group):
initialize=[
(g[0], g[1])
for g in group
if max(g[2].nonconvex.minimum_uptime) > 0
if g[2].nonconvex.minimum_uptime.max() > 0
]
)
self.MINDOWNTIMEFLOWS = Set(
initialize=[
(g[0], g[1])
for g in group
if max(g[2].nonconvex.minimum_downtime) > 0
if g[2].nonconvex.minimum_downtime.max() > 0
]
)
self.NEGATIVE_GRADIENT_FLOWS = Set(
Expand Down
48 changes: 23 additions & 25 deletions src/oemof/solph/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import sys
from collections import abc
from itertools import groupby

import numpy as np
Expand All @@ -25,6 +26,7 @@
from pyomo.core.base.piecewise import IndexedPiecewise
from pyomo.core.base.var import Var

from ._plumbing import _FakeSequence
from .helpers import flatten


Expand Down Expand Up @@ -510,7 +512,8 @@ def __separate_attrs(
"""

def detect_scalars_and_sequences(com):
com_data = {"scalars": {}, "sequences": {}}
scalars = {}
sequences = {}

default_exclusions = [
"__",
Expand Down Expand Up @@ -538,13 +541,13 @@ def detect_scalars_and_sequences(com):
# "investment" prefix to component data:
if attr_value.__class__.__name__ == "Investment":
invest_data = detect_scalars_and_sequences(attr_value)
com_data["scalars"].update(
scalars.update(
{
"investment_" + str(k): v
for k, v in invest_data["scalars"].items()
}
)
com_data["sequences"].update(
sequences.update(
{
"investment_" + str(k): v
for k, v in invest_data["sequences"].items()
Expand All @@ -553,24 +556,27 @@ def detect_scalars_and_sequences(com):
continue

if isinstance(attr_value, str):
com_data["scalars"][a] = attr_value
scalars[a] = attr_value
continue

# If the label is a tuple it is iterable, therefore it should be
# converted to a string. Otherwise, it will be a sequence.
if a == "label":
attr_value = str(attr_value)

# check if attribute is iterable
# see: https://stackoverflow.com/questions/1952464/
# in-python-how-do-i-determine-if-an-object-is-iterable
try:
_ = (e for e in attr_value)
com_data["sequences"][a] = attr_value
except TypeError:
com_data["scalars"][a] = attr_value
if isinstance(attr_value, abc.Iterable):
sequences[a] = attr_value
elif isinstance(attr_value, _FakeSequence):
scalars[a] = attr_value.value
else:
scalars[a] = attr_value

com_data["sequences"] = flatten(com_data["sequences"])
sequences = flatten(sequences)

com_data = {
"scalars": scalars,
"sequences": sequences,
}
move_undetected_scalars(com_data)
if exclude_none:
remove_nones(com_data)
Expand All @@ -586,19 +592,11 @@ def move_undetected_scalars(com):
if isinstance(value, str):
com["scalars"][ckey] = value
del com["sequences"][ckey]
continue
try:
_ = (e for e in value)
except TypeError:
com["scalars"][ckey] = value
elif isinstance(value, _FakeSequence):
com["scalars"][ckey] = value.value
del com["sequences"][ckey]
elif len(value) == 0:
del com["sequences"][ckey]
else:
try:
if not value.default_changed:
com["scalars"][ckey] = value.default
del com["sequences"][ckey]
except AttributeError:
pass

def remove_nones(com):
for ckey, value in list(com["scalars"].items()):
Expand Down
Loading
Loading