Skip to content

Commit

Permalink
Merge branch 'release/1.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Dec 28, 2014
2 parents 18f19be + cf90051 commit f04b56e
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 104 deletions.
46 changes: 44 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,54 @@ res = Observable.timer(5000, Scheduler.timeout) # No, this is an error
Thus when an operator like `Observable.timeout` has multiple optional arguments
you should name your arguments. At least the arguments marked as optional.

## Python Alignment

Disposables implements a context manager so you may use them in `with`
statements.

Observable sequences may be concatenated using `+`, so you can write:

```python
xs = Observable.from_([1,2,3])
ys = Observable.from_([4,5,6])
zs = xs + ys # Concatenate observables
```

Observable sequences may be repeated using `*=`, so you can write:

```python
xs = Observable.from_([1,2,3])
ys = xs * 4
```

Observable sequences may be sliced using `[start:stop:step]`, so you can write:

```python
xs = Observable.from_([1,2,3,4,5,6])
ys = xs[1:-1]
```

Observable sequences may be turned into an iterator so you can use generator
expressions, or iterate over them (uses queueing and blocking).

```python
xs = Observable.from_([1,2,3,4,5,6])
ys = xs.to_blocking()
zs = (x*x for x in ys if x > 3)
for x in zs:
print(x)
```

## Schedulers

In RxPY you can choose to run fully asynchronously or you may decide to schedule
work and timeouts using threads.

For time and scheduler handing you will need to supply
[datetime](https://docs.python.org/2/library/datetime.html) for absolute time
values and
[timedelta](https://docs.python.org/2/library/datetime.html#timedelta-objects)
for relative time. For relative time values you may also use `int` to represent
milliseconds, or `float` to represent seconds.
for relative time. You may also use `int` to represent milliseconds.

RxPY also comes with batteries included, and has a number of Python specific
mainloop schedulers to make it easier for you to use RxPY with your favorite
Expand Down
24 changes: 6 additions & 18 deletions changes.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
# Changes

## 1.0.0rc6
## 1.0.0

- Fixed bug in ScheduledDisposable#dispose. Only dispose if not disposed
- Fixed typo in `Pattern#_and`. Should be `Pattern#and_`
- Fixed bug. Replaced push with append in controlledsubject.py
- Refeactored `observer_from_notifier` to `Observer.from_notification`

## 1.0.0rc5

- Added missing rx.linq.observable.blocking from setup.py

## 1.0.0rc4

- Added missing rx.joins from setup.py

## 1.0.0rc3

- Removed some non git files files that were added to the package by accident
- Added `Observable#to_iterable()`

## 1.0.0rc2

- Fixed examples. Use `debounce` instead of `throttle`
- Fixed wrong aliases for `select_switch`.

## 1.0.0rc1

- Added join patterns. `Observable.when` and `Observable#and_`
- Added `BlockingObservable`and operators `for_each` and `to_iterable`
- Started adding docstrings as reStructuredText in order for PyCharm to infer
Expand Down Expand Up @@ -53,10 +39,12 @@

- Aligning throttle type operator naming with RxJS and RxJava
- Added `throttle_last()` as alias for `sample()`
- Renamed `throttle()` to `debounce()` and added `throttle_with_timeout()` as alias
- Renamed `throttle()` to `debounce()` and added `throttle_with_timeout()` as
alias
- Renamed `any()` to `some()`
- Simplified `sequence_equal()`
- Bugfix for `take()` when no count given
- Removed internal operator `final_value()` which did exactly the same as `last()`
- Removed internal operator `final_value()` which did exactly the same as
`last()`
- Added `to_iterable()` as alias to `to_list()`
- Added `throttle_first()`
8 changes: 4 additions & 4 deletions rx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
try:
from threading import Lock
from threading import Lock
except ImportError:
from rx.internal.concurrency import NoLock as Lock
from rx.internal.concurrency import NoLock as Lock

try:
from asyncio import Future
Expand All @@ -10,8 +10,8 @@

# Rx configuration dictionary
config = {
"Future" : Future,
"Lock" : Lock
"Future": Future,
"Lock": Lock
}

from .observable import Observable
Expand Down
4 changes: 2 additions & 2 deletions rx/abstractobserver.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from rx.internal import noop, default_error

class AbstractObserver(object):
"""Abstract base class for implementations of the Observer class. This base
class enforces the grammar of observers where OnError and OnCompleted are
"""Abstract base class for implementations of the Observer class. This base
class enforces the grammar of observers where OnError and OnCompleted are
terminal messages.
"""

Expand Down
1 change: 1 addition & 0 deletions rx/backpressure/controlledsubject.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from rx.subjects import Subject
from rx.internal.utils import check_disposed


class ControlledSubject(Observable):
def __init__(self, enable_queue=True):
super(ControlledSubject, self).__init__(self._subscribe)
Expand Down
3 changes: 2 additions & 1 deletion rx/backpressure/stopandwait.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
def stop_and_wait(self):
"""Attaches a stop and wait observable to the current observable.
Returns a stop and wait observable {Observable}.
:returns: A stop and wait observable.
:rtype: Observable
"""

return StopAndWaitObservable(self)
9 changes: 5 additions & 4 deletions rx/backpressure/windowed.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from rx.concurrency import current_thread_scheduler
from rx.internal import extensionmethod

from .controlledobservable import ControlledObservable
from .windowedobservable import WindowedObservable


@extensionmethod(ControlledObservable)
def windowed(self, window_size):
"""Creates a sliding windowed observable based upon the window size.
Keyword arguments:
:param int window_size: The number of items in the window
window_size -- {Number} The number of items in the window
Returns a windowed observable {Observable} based upon the window size.
:returns: A windowed observable based upon the window size.
:rtype: Observable
"""

return WindowedObservable(self, window_size)
2 changes: 2 additions & 0 deletions rx/backpressure/windowedobservable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

log = logging.getLogger('Rx')


class WindowedObserver(AbstractObserver):
def __init__(self, observer, observable, cancel, scheduler):
self.observer = observer
Expand Down Expand Up @@ -42,6 +43,7 @@ def dispose(self):

super(AbstractObserver, self).dispose()


class WindowedObservable(Observable):
def __init__(self, source, window_size, scheduler=None):
super(WindowedObservable, self).__init__(self._subscribe)
Expand Down
3 changes: 2 additions & 1 deletion rx/concurrency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from .scheduleditem import ScheduledItem

from .immediatescheduler import ImmediateScheduler, immediate_scheduler
from .currentthreadscheduler import CurrentThreadScheduler, current_thread_scheduler
from .currentthreadscheduler import CurrentThreadScheduler, \
current_thread_scheduler
from .virtualtimescheduler import VirtualTimeScheduler
from .timeoutscheduler import TimeoutScheduler, timeout_scheduler
from .newthreadscheduler import NewThreadScheduler, new_thread_scheduler
Expand Down
15 changes: 8 additions & 7 deletions rx/concurrency/catchscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ def schedule_now(self, state, action):

return self._scheduler.scheduleWithState(state, self._wrap(action))

def schedule_relative(self, state, due_time, action):
def schedule_relative(self, duetime, action, state=None):
"""Schedules an action to be executed after duetime."""

return self._scheduler.schedule_relative(due_time, self._wrap(action),
return self._scheduler.schedule_relative(duetime, self._wrap(action),
state=state)

def schedule_absolute(self, state, due_time, action):
def schedule_absolute(self, duetime, action, state=None):
"""Schedules an action to be executed at duetime."""

return self._scheduler.schedule_absolute(due_time, self._wrap(action),
return self._scheduler.schedule_absolute(duetime, self._wrap(action),
state=state)

def _clone(self, scheduler):
Expand Down Expand Up @@ -60,17 +60,18 @@ def schedule_periodic(self, period, action, state=None):
d = SingleAssignmentDisposable()
failed = [False]

def action(state1):
def periodic_action(periodic_state):
if failed[0]:
return None
try:
return action(state1)
return action(periodic_state)
except Exception as ex:
failed[0] = True
if not self._handler(ex):
raise Exception(ex)
d.dispose()
return None

d.disposable = self._scheduler.schedule_periodic(action, period, state)
d.disposable = self._scheduler.schedule_periodic(periodic_action,
period, state)
return d
21 changes: 13 additions & 8 deletions rx/concurrency/mainloopscheduler/asyncioscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ def schedule_relative(self, duetime, action, state=None):
return scheduler.schedule(action, state)

disposable = SingleAssignmentDisposable()

def interval():
disposable.disposable = action(scheduler, state)

log.debug("timeout: %s", seconds)
handle = [self.loop.call_later(seconds, interval)]

def dispose():
Expand All @@ -68,18 +68,23 @@ def schedule_absolute(self, duetime, action, state=None):
"""Schedules an action to be executed at duetime.
Keyword arguments:
duetime -- {datetime} Absolute time after which to execute the action.
action -- {Function} Action to be executed.
:param datetime duetime: Absolute time after which to execute the
action.
:param types.FunctionType action: Action to be executed.
:param T state: Optional state to be given to the action function.
Returns {Disposable} The disposable object used to cancel the scheduled
action (best effort)."""
:returns: The disposable object used to cancel the scheduled action
(best effort).
:rtype: Disposable
"""

duetime = self.to_datetime(duetime)
return self.schedule_relative(duetime - self.now(), action, state)

def now(self):
"""Represents a notion of time for this scheduler. Tasks being scheduled
on a scheduler will adhere to the time denoted by this property."""
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by this
property.
"""

return self.to_datetime(self.loop.time())

4 changes: 3 additions & 1 deletion rx/concurrency/newthreadscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from threading import Timer
from datetime import timedelta

from rx.disposables import Disposable, SingleAssignmentDisposable, CompositeDisposable
from rx.disposables import Disposable, SingleAssignmentDisposable, \
CompositeDisposable

from .scheduler import Scheduler

log = logging.getLogger('Rx')


class NewThreadScheduler(Scheduler):
"""Creates an object that schedules each unit of work on a separate thread.
"""
Expand Down
2 changes: 2 additions & 0 deletions rx/concurrency/scheduleditem.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from rx.disposables import SingleAssignmentDisposable


def default_sub_comparer(x, y):
return 0 if x == y else 1 if x > y else -1


class ScheduledItem(object):
def __init__(self, scheduler, state, action, duetime, comparer=None):
self.scheduler = scheduler
Expand Down
9 changes: 6 additions & 3 deletions rx/concurrency/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def schedule_recursive(self, action, state=None):
:param types.FunctionType action: Action to execute recursively.
The parameter passed to the action is used to trigger recursive
scheduling of the action.
:param T state: State to be given to the action function.
:returns: The disposable object used to cancel the scheduled action
(best effort).
(best effort).
:rtype: Disposable
"""

Expand Down Expand Up @@ -200,7 +202,8 @@ def func(dt):
action=action1,
state=action)

def schedule_recursive_with_absolute_and_state(self, duetime, action, state):
def schedule_recursive_with_absolute_and_state(self, duetime, action,
state):
"""Schedules an action to be executed recursively at a specified
absolute due time.
Expand Down Expand Up @@ -242,7 +245,7 @@ def to_relative(cls, timespan):
elif isinstance(timespan, timedelta):
timespan = int(timespan.total_seconds()*1000)
elif isinstance(timespan, float):
timespan = timespan * 1000
timespan *= 1000

return int(timespan)

Expand Down
Loading

0 comments on commit f04b56e

Please sign in to comment.