Skip to content

Commit

Permalink
magics fixed and a lot cooler
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdremov committed Mar 22, 2023
1 parent e97baac commit fb2979e
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 106 deletions.
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
# igogo
# igogo 🐎🏎️

---

Execute several jupyter cells at the same time

> Have you ever just sited and watched a long-running jupyter cell?
> **Now, you can continue to work in the same notebook freely**
## Wait, isn't it just a background job? No.

- No multithreading, no data races, no locks.
You can freely operate with your notebook variables without the risk of corrupting them.
- Beautiful output. When several cells execute in parallel,
all printed data is displayed in corresponding cell's output. No more twisted and messed out concurrent outputs.
- Easily cancel jobs, wait for completion, and start the new ones.

## Use cases
1) You have a long-running cell, and you need to check something.
You can just start the second cell without interrupting a long-running cell.
> **Example:** you run a machine learning train loop and want to immediately save model's weights or check metrics.
> With `igogo` you can do so without interrupting the training.
2) If you need to compare score of some function with different parameters, you can run several
functions at the same time and monitor results.
> **Example:** you have several sets of hyperparameters and want to compare them.
> You can start training two models, monitoring two loss graphs at the same time.
## Install

Igogo is available through PyPi:

```bash
pip install igogo
```

## Usage
1 change: 0 additions & 1 deletion bld.bat

This file was deleted.

2 changes: 0 additions & 2 deletions build.sh

This file was deleted.

3 changes: 2 additions & 1 deletion igogo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import IPython
from .yielder import Yielder as yielder_async
from .core import job, stop, run
from .core import job, stop, sleep, display, clear_output, stop_all
from .core import stop_by_cell_id, stop_latest, get_running_igogo_cells
from .loaders import register_hooks as _register_hooks
from .loaders import IpythonWatcher
from .loaders import load_ipython_extension
Expand Down
1 change: 0 additions & 1 deletion igogo/awaiters.py

This file was deleted.

17 changes: 13 additions & 4 deletions igogo/context.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
from .output import OutputStream
from typing import List
import asyncio
import contextvars

from .output import OutputStreamsSetter, OutputObject
from .exceptions import IgogoInvalidContext


class IgogoContext(object):
out_stream: OutputStream
out_stream: OutputStreamsSetter
task: asyncio.Task
additional_outputs: List[OutputObject]

def __init__(self, task: asyncio.Task, out_stream: OutputStream):
def __init__(self, task: asyncio.Task, out_stream: OutputStreamsSetter, additional_outputs: List[OutputObject]):
self.out_stream = out_stream
self.task = task
self.additional_outputs = additional_outputs


_context: contextvars.ContextVar[IgogoContext | None] = contextvars.ContextVar("igogo_context", default=None)


def get_context_or_none() -> IgogoContext:
return _context.get()


def get_context_or_fail() -> IgogoContext:
value = _context.get()
value = get_context_or_none()
if value is None:
raise IgogoInvalidContext()
return value


def set_context(context: IgogoContext):
_context.set(context)
122 changes: 104 additions & 18 deletions igogo/core.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,137 @@
import asyncio
import functools
import inspect
import sys
from typing import Dict, List

import IPython
import greenback

from .context import IgogoContext, get_context_or_fail, set_context
from .output import Output, OutputStream
from .exceptions import IgogoInvalidContext
from .output import OutputText, OutputStreamsSetter, OutputObject, OutputTextStyled
from .exceptions import IgogoInvalidContext, IgogoAdditionalOutputsExhausted

_igogo_run_loop = asyncio.get_running_loop()
_all_tasks: Dict[int, List[asyncio.Task]] = dict()
_igogo_count = 0


def _get_currently_running_cells_info():
global _all_tasks
keys = map(str, _all_tasks.keys())
keys = '], ['.join(list(keys))
if len(keys) > 0:
keys = '[' + keys + ']'
return keys


def _log_error(*argc, **kwargs):
if not 'file' in kwargs:
kwargs['file'] = sys.stderr
print('[ IGOGO ]', *argc, **kwargs)
running_s = _get_currently_running_cells_info()
if len(running_s) == 0:
running_s = '<none>'
print(f'[ IGOGO ] Currently running IGOGO cells: {running_s}', file=kwargs['file'])


def stop():
value = get_context_or_fail()
value.task.cancel()


def get_running_igogo_cells():
global _all_tasks
_update_all_tasks()
return list(_all_tasks.keys())


def sleep(delay, result=None):
if not greenback.has_portal():
raise IgogoInvalidContext()
greenback.await_(asyncio.sleep(delay, result))


def run():
def get_pending():
return list(filter(lambda x: 'igogo' in x.get_name(), asyncio.all_tasks(loop=_igogo_run_loop)))
def display(object):
value = get_context_or_fail()
if len(value.additional_outputs) == 0:
raise IgogoAdditionalOutputsExhausted()
out = value.additional_outputs.pop()
out.add_object(object)
value.additional_outputs.insert(0, out)


def clear_output(including_text=True):
value = get_context_or_fail()
if including_text:
value.out_stream.stdout.clear()
for out in value.additional_outputs:
out.clear()


def _update_all_tasks():
global _all_tasks

def filter_rule(task: asyncio.Task):
return not task.done()

for key in _all_tasks:
_all_tasks[key] = list(filter(filter_rule, _all_tasks[key]))
_all_tasks = {k: v for k, v in _all_tasks.items() if len(v) > 0}


def get_pending_tasks():
return list(filter(lambda x: 'igogo' in x.get_name(), asyncio.all_tasks(loop=_igogo_run_loop)))


pending_all = get_pending()
while len(pending_all):
pending = pending_all[-1]
_igogo_run_loop.run_until_complete(pending)
pending_all = get_pending()
def stop_all():
for task in get_pending_tasks():
task.cancel()


def job(original_function=None, kind='text'):
def stop_latest():
global _all_tasks
_update_all_tasks()
keys = list(_all_tasks.keys())
if len(keys) == 0:
_log_error("No running tasks")
return
latest_key = max(keys)
task = _all_tasks[latest_key].pop()
task.cancel()


def stop_by_cell_id(cell_id):
global _all_tasks
_update_all_tasks()

cell_id = int(cell_id)
if not cell_id in _all_tasks:
_log_error(f"There's no running tasks in cell [{cell_id}]")
return
for task in _all_tasks[cell_id]:
task.cancel()


def job(original_function=None, kind='stdout', displays=10):
global _igogo_count

def _decorate(function):
@functools.wraps(function)
def wrapped_function(*args, **kwargs):
global _igogo_count
output_stream = OutputStream(Output(kind=kind))
output_stream.activate()
global _igogo_count, _all_tasks
output_stream = OutputStreamsSetter(stdout=OutputText(kind=kind), stderr=OutputText(kind='stderr'))
additional_outputs = list(reversed([OutputObject() for _ in range(displays)]))

async def func_context_setter():
await greenback.ensure_portal()
set_context(
IgogoContext(task, output_stream)
IgogoContext(task, output_stream, additional_outputs)
)
output_stream.activate()
return await function(*args, **kwargs)
if inspect.iscoroutinefunction(function):
return await function(*args, **kwargs)
return function(*args, **kwargs)

coro = func_context_setter()

Expand All @@ -70,9 +153,12 @@ def done_callback(t):
task.set_name(f'igogo_{_igogo_count}')
_igogo_count += 1

ip = IPython.get_ipython()
_all_tasks.setdefault(ip.execution_count, [])
_all_tasks[ip.execution_count].append(task)

return dict(
task=task,
output=output_stream.out
task=task
)

def stop_all():
Expand Down
17 changes: 16 additions & 1 deletion igogo/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,17 @@
import io
class IgogoInvalidContext(Exception):
...
def __init__(self):
from .core import _log_error
file = io.StringIO()
_log_error('Igogo command invoked from invalid context', file=file)
super().__init__(file.getvalue())
file.close()


class IgogoAdditionalOutputsExhausted(Exception):
def __init__(self):
from .core import _log_error
file = io.StringIO()
_log_error('Igogo invoked display function, but cell has not enough display capabilities', file=file)
super().__init__(file.getvalue())
file.close()
18 changes: 15 additions & 3 deletions igogo/loaders.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
from ipykernel.zmqshell import ZMQInteractiveShell
from .output import OutputStream, Output
from .output import OutputStreamsSetter, OutputText, OutputTextStyled

import sys


class IpythonWatcher(object):
def __init__(self, ip: ZMQInteractiveShell):
self.shell = ip
self._save_prev_outputs()

def _save_prev_outputs(self):
self.stdout = sys.stdout
self.stderr = sys.stderr

def _activate_prev_outputs(self):
sys.stdout = self.stdout
sys.stderr = self.stderr

def pre_execute(self):
stream = OutputStream(Output())
self._save_prev_outputs()
stream = OutputStreamsSetter(stdout=OutputText(kind='stdout'), stderr=OutputText(kind='stderr'))
stream.activate()

def pre_run_cell(self, info):
...

def post_execute(self):
...
self._activate_prev_outputs()

def post_run_cell(self, result):
...
Expand Down
16 changes: 11 additions & 5 deletions igogo/magic.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import IPython
from IPython.core.magic import (Magics, magics_class, cell_magic)
from .output import Output

@magics_class
class IgogoMagic(Magics):
@cell_magic
def igogo(self, line, cell):
ip = IPython.get_ipython()
args: dict = eval(f"dict({line})")
args.setdefault('update_globals', False)
update_globals = args['update_globals']
args.pop('update_globals')
prefix = "def __igogo_magic_wrapper():\n" \
" import igogo\n" \
f" @igogo.job(**dict({line}))\n" \
" async def execute():\n"
f" @igogo.job(**dict({args}))\n" \
" async def execute():\n"\
" global " + ', '.join(list(IPython.get_ipython().user_ns.keys())) + '\n'
cell = prefix + '\n'.join([' ' + line for line in cell.split('\n')])
cell += "\n" \
cell += "\n" + \
(" globals().update(locals())\n" if update_globals else "") + \
" return execute()\n" \
"__igogo_magic_wrapper()"
ip.ex(cell)
ip.run_cell(cell, silent=True)

Loading

0 comments on commit fb2979e

Please sign in to comment.