Replies: 6 comments 4 replies
-
Hi @juxeii If I understand your question, assuming that you expect an observable of lists, not a list of lists, I think you're looking for The below should work. Not sure it's very readable. Basically you need to make your input hot and shared, then use it as shown inside the def test_issue_706():
scheduler = TestScheduler()
messages = [
on_next(200 + (i + 1) * 10, x)
for i, x in enumerate([1, 2, 3, 2, 4, 5, 0, 2, 1, 3, 4, 5, 2, 2, 1, 4, 5])
]
print(messages)
numbers = scheduler.create_hot_observable(messages)
numbers_shared = numbers.pipe(operators.share())
def create():
return numbers_shared.pipe(
operators.window_toggle(
numbers_shared.pipe(
operators.filter(lambda x: x == 1),
),
lambda _: numbers_shared.pipe(
operators.filter(lambda x: x == 5),
),
),
operators.flat_map(lambda window: window.pipe(operators.to_list())),
)
results = scheduler.start(create=create)
assert results.messages == [
on_next(260, [1, 2, 3, 2, 4, 5]),
on_next(320, [1, 3, 4, 5]),
on_next(370, [1, 4, 5]),
] |
Beta Was this translation helpful? Give feedback.
-
Works like it should! |
Beta Was this translation helpful? Give feedback.
-
Hi, yes please, a custom operator would be ideal. The reason why I detected this behavior is that I need to return a list of lists. But return numbers_shared.pipe(
ops.window_toggle(
numbers_shared.pipe(
ops.filter(lambda x: x == 1),
),
lambda _: numbers_shared.pipe(
ops.filter(lambda x: x == 5),
),
),
ops.flat_map(lambda window: window.pipe(ops.to_list())),
ops.to_list()
).run() returns the wrong values. blocks =[]
numbers_shared.pipe(
ops.window_toggle(
numbers_shared.pipe(
ops.filter(lambda x: x == 1),
),
lambda _: numbers_shared.pipe(
ops.filter(lambda x: x == 5),
),
),
ops.flat_map(lambda window: window.pipe(ops.to_list())),
).subscribe(lambda b: blocks.append(b))
return blocks But having |
Beta Was this translation helpful? Give feedback.
-
Thx for your effort. The example I have given is with numbers, in real code it is with other type(s), so I would need to rewrite the operator for a generic use anyway. BTW: the documentation for |
Beta Was this translation helpful? Give feedback.
-
Generic operator is now def list_from_to(start_compare: Callable[[T], bool],
end_compare: Callable[[T], bool]) -> Observable[list[T]]:
lock = threading.Lock()
current_values = []
is_on = False
def operator(source):
def subscribe(observer, scheduler=None):
def on_next(value):
nonlocal is_on
with lock:
if start_compare(value):
is_on = True
if is_on:
current_values.append(value)
if end_compare(value):
is_on = False
observer.on_next(list(current_values))
current_values.clear()
return source.subscribe(
on_next=on_next,
on_completed=observer.on_completed,
on_error=observer.on_error,
scheduler=scheduler,
)
return Observable(subscribe)
return operator Will be used something like numbers.pipe(
utils.list_from_to(lambda x: x == 1, lambda x: x == 5),
ops.to_list()
).run() |
Beta Was this translation helpful? Give feedback.
-
Yes, the interplay between Thanks for your great support! |
Beta Was this translation helpful? Give feedback.
-
I have
[1,2,3,2,4,5,0,2,1,3,4,5,2,2,1,4,5]
What I want as output is
[[1,2,3,2,4,5],[1,3,4,5],[1,4,5]]
This means I want to extract blocks from the input sequence with some start marker, here 1, and some end marker, here 5. Elements in between these blocks are ignored.
Is there a readable combination of operators for achieving this?
Beta Was this translation helpful? Give feedback.
All reactions