Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create generic pipeline_manager.start() #54

Open
andrebco opened this issue Mar 8, 2013 · 5 comments
Open

Create generic pipeline_manager.start() #54

andrebco opened this issue Mar 8, 2013 · 5 comments

Comments

@andrebco
Copy link
Contributor

andrebco commented Mar 8, 2013

Pipeliner Manager should have a method that starts all the pipelines from an iterable (or generator). This method should be done on pipeline manager in order to:

  • simplify the complexity of sendpipelines script
  • make testing easier
  • manage all the processes that should be started and finished, so we may use less memory on this script
@israelst
Copy link

israelst commented Mar 9, 2013

I'm working on a feature at MBJ, which can be the base to solve this issue. Code comes soon.

@andrebco
Copy link
Contributor Author

This issue is also related to the reduction of memory usage on
sendpipelines. This generic pipelinemanager.start() may consider having a
memory usage strategy to avoid the high usage of memory while running many
pipelines.

@israelst
Copy link

This is the runner, this is the sendpipeline

[UPDATE: This repo is private, sorry]

@turicas
Copy link
Contributor

turicas commented Mar 12, 2013

@israelst, this repo is private. :)

@israelst
Copy link

These files are under agressive refactoring, but...

Here comes the runner:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import os.path
import time
import sys
from importlib import import_module


def main():
    """Run the sendpipelines."""
    parser, args, rest = _parse_args()
    script = args.script
    if os.path.exists(script):
        sys.argv = [script] + rest
        sys.path.insert(0, os.path.dirname(script))
        module = import_module(os.path.basename(script)[:-3])
        send_pipelines(*module.main())
    else:
        parser.error('could not find script at "{}"'.format(script))

def _parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('script')
    args, rest = parser.parse_known_args()
    return parser, args, rest

def send_pipelines(pipeline_generator, manager, total):
    print 'Sending pipelines...'
    STATS_INTERVAL = 10
    POOL = 1000 # Workers per core X Brokers X Cores (What it means?)
    start_time = time.time()
    started = lambda: manager.started_pipelines
    finished = lambda: manager.finished_pipelines
    rate = lambda x: x / (time.time() - start_time)
    pipelines = pipeline_generator()
    def print_stats(interval):
        if started() % interval == 0 or\
           finished() % interval == 0:
            print '\rSent: {}/{} ({:10.5f}/s) Finished: {}/{}({:10.5f}/s)'\
                  .format(started(), total,
                          rate(started()), finished(),
                          total, rate(finished())),
            sys.stdout.flush()

    while manager.finished_pipelines < total:
        active_pipelines = started() - finished()
        while active_pipelines < POOL and started() < total:
            manager.start(pipelines.next())
            print_stats(STATS_INTERVAL)
        manager.update(timeout=1)
        print_stats(STATS_INTERVAL)

    end_time = time.time()
    total_time = end_time - start_time
    pipelines_per_second = total / (end_time - start_time)
    print 'Total time: {:10.5f} seconds'.format(total_time)
    print 'Pipelines per second: {:10.5f}'.format(pipelines_per_second)


main()

Here comes the sendpipelines:

#!/usr/bin/env python
# coding: utf-8

import argparse

from pypelinin import Job, Pipeline, PipelineManager
import pymongo

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--host', default='127.0.0.1', help='Router host')
parser.add_argument('--mongo', default='127.0.0.1', help='MongoDB host')
parser.add_argument('--last', default=9999999, type=int, help='Upper bound')

def main():
    args = parser.parse_args()

    coll = pymongo.Connection(host=args.mongo)['my_db']['my_coll']

    _range = xrange(0 , args.last)

    def pipelines():
        pipeline_definition = {Job('Job1'): Job('Job2'),
                               Job('Job2'): Job('Job3')}
        for _id in _range:
            coll.insert({'_id': _id})
            yield Pipeline(pipeline_definition, data={'_id': _id})


    total = len(_range)
    manager = PipelineManager(api='tcp://{}:5550'.format(args.host),
                                       broadcast='tcp://{}:5551'.format(args.host))
    return pipelines, manager, total

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants