diff --git a/collector.py b/collector.py index 5019113..e7bb21e 100644 --- a/collector.py +++ b/collector.py @@ -2,8 +2,11 @@ from sys import path path.insert(0, '..') -from brook.plugins import * from brook import plugin +from brook.plugins import * +from brook.models import Event + +Event.init_table() print 'Loaded plugins: {0}'.format(', '.join(x[0] for x in plugin.loaded)) diff --git a/config.py b/config.py index 82506f3..9cd300b 100644 --- a/config.py +++ b/config.py @@ -1,10 +1,10 @@ # This file serves as both the plugin and web app configuration file - -# This should be any valid SQLAlchemy connection string -# http://docs.sqlalchemy.org/en/rel_0_8/core/engines.html#database-urls -# If using sqlite be sure to use an absolute path -DATABASE = '' +PG_DB = 'brook' +PG_USER = '' +PG_PASS = '' +PG_HOST = 'localhost' +PG_PORT = 5432 # Web configuration diff --git a/models.py b/models.py new file mode 100644 index 0000000..dbf3ead --- /dev/null +++ b/models.py @@ -0,0 +1,109 @@ +import json + +import psycopg2 +from psycopg2.extras import DictCursor + +import config + + +class Event(object): + + conn = psycopg2.connect( + database=config.PG_DB, + user=config.PG_USER, + password=config.PG_PASS, + host=config.PG_HOST, + port=config.PG_PORT + ) + + @classmethod + def _cursor(self): + return self.conn.cursor(cursor_factory=DictCursor) + + @staticmethod + def _prep_data(events): + times = set() + for e in events: + e['info'] = json.loads(e['info']) + times.add(e['time']) + + if len(times) > 0: + last = max(times) + else: + last = None + + return (events, last) + + @classmethod + def init_table(self): + cur = self._cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS events + ( + id serial PRIMARY KEY UNIQUE, + service text, + info text, + time timestamp, + event_id text UNIQUE + ); + """ + ) + self.conn.commit() + cur.close() + + @classmethod + def new(self, service, info, e_time, e_id): + status = { + 'success': True, + 'error': None + } + e_id = '{0}_{1}'.format(service, e_id) + if isinstance(info, dict): + info = json.dumps(info) + elif isinstance(info, str): + # Check if info is valid json + json.loads(info) + + cur = self._cursor() + try: + cur.execute( + """ + INSERT INTO events + (service, info, time, event_id) + VALUES (%s, %s, %s, %s) + """, (service, info, e_time, e_id) + ) + self.conn.commit() + except psycopg2.IntegrityError: + status['success'] = False + status['error'] = 'Event already exists' + self.conn.rollback() + finally: + cur.close() + + return status + + @classmethod + def latest(self, limit=config.EVENT_AMOUNT): + cur = self._cursor() + cur.execute( + 'SELECT * FROM events ORDER BY time DESC LIMIT %s', + (limit,) + ) + + events = cur.fetchall() + return self._prep_data(events) + + @classmethod + def since(self, time): + cur = self._cursor() + cur.execute( + """ + SELECT * FROM events WHERE time > %s + ORDER BY time DESC + """, (time,) + ) + + events = cur.fetchall() + return self._prep_data(events) diff --git a/plugin.py b/plugin.py index 1cc7654..384b717 100644 --- a/plugin.py +++ b/plugin.py @@ -1,36 +1,6 @@ -import json - -from sqlalchemy import create_engine, Column, Integer, String, DateTime -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker - -from brook import config +from brook.models import Event loaded = [] -engine = create_engine(config.DATABASE, echo=False) -Base = declarative_base() -session = sessionmaker(bind=engine) -Session = session() - - -class Brook(Base): - __tablename__ = 'brook' - - id = Column(Integer, primary_key=True) - service = Column(String(64), nullable=False) - info = Column(String(9999), nullable=False) - time = Column(DateTime, nullable=False) - event_id = Column(String(128), nullable=False, unique=True) - - def __init__(self, service, info, time, event_id): - self.service = service - self.info = info - self.time = time - self.event_id = event_id - - -# Create table if it doesn't already exist -Base.metadata.create_all(engine) class Plugin(object): @@ -39,35 +9,19 @@ class Plugin(object): """ - database = config.DATABASE - init = None - def __init__(self, name): loaded.append((name, self)) self.name = name def main(self): """ - This is the entry point for all plugins + This is the entry point for all plugins. + Plugins should gather the appropriate data + and then call self.insert_data """ pass - def _id_exists(self, event_id): - """ - Checks if `event_id` is already in the database - - """ - event_id = '{0}_{1}'.format(self.name, event_id) - - query = Session.query(Brook).filter_by(event_id=event_id).distinct() - items = [i for i in query] - - if items == []: - return False - else: - return True - def insert_data(self, event_id, time, info): """ Inserts data into a database @@ -76,16 +30,5 @@ def insert_data(self, event_id, time, info): info: dict or json that will be used in templates """ - - if isinstance(info, dict): - info = json.dumps(info) - pass - elif isinstance(info, str): - # Check if info is valid json - json.loads(info) - - if not self._id_exists(event_id): - event_id = '{0}_{1}'.format(self.name, event_id) - items = Brook(self.name, info, time, event_id) - Session.add(items) - Session.commit() + status = Event.new(self.name, info, time, event_id) + return status diff --git a/plugins/github.py b/plugins/github.py index 884b788..6110a8d 100644 --- a/plugins/github.py +++ b/plugins/github.py @@ -27,9 +27,8 @@ def main(self): event_info = globals()[event['type']](event) - if not self._id_exists(event_info[0]): - self.insert_data(*event_info) - else: + s = self.insert_data(*event_info) + if not s['success']: # We're only going to find events # we've already got in the DB return diff --git a/requirements.txt b/requirements.txt index 967c226..663c056 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ Flask==0.9 Jinja2==2.6 -SQLAlchemy==0.7.9 Werkzeug==0.8.3 requests==0.14.2 Flask-DebugToolbar==0.7.1 -Flask-SQLAlchemy==0.16 +psycopg2==2.4.6 \ No newline at end of file diff --git a/run.py b/run.py index ffa7ebb..4774a49 100644 --- a/run.py +++ b/run.py @@ -1,3 +1,6 @@ from stream import app +from models import Event +Event.init_table() + app.run('0.0.0.0') diff --git a/stream/__init__.py b/stream/__init__.py index c3c63e0..b64cf26 100644 --- a/stream/__init__.py +++ b/stream/__init__.py @@ -1,30 +1,10 @@ from flask import Flask -from flask.ext.sqlalchemy import SQLAlchemy from flask_debugtoolbar import DebugToolbarExtension app = Flask(__name__) app.config.from_pyfile('../config.py') -app.config['SQLALCHEMY_DATABASE_URI'] = app.config['DATABASE'] -db = SQLAlchemy(app) toolbar = DebugToolbarExtension(app) - -class Brook(db.Model): - __tablename__ = 'brook' - - id = db.Column(db.Integer, primary_key=True) - service = db.Column(db.String(64), nullable=False) - info = db.Column(db.String(9999), nullable=False) - time = db.Column(db.DateTime, nullable=False) - event_id = db.Column(db.String(128), nullable=False, unique=True) - - def __init__(self, service, info, time, event_id): - self.service = service - self.info = info - self.time = time - self.event_id = event_id - - import stream.views import stream.filters diff --git a/stream/views.py b/stream/views.py index 0c65904..7ed041d 100644 --- a/stream/views.py +++ b/stream/views.py @@ -1,24 +1,27 @@ import sys -import json from datetime import datetime sys.path.append('..') -from flask import render_template, jsonify, make_response, request, url_for, redirect +from flask import ( + render_template, + make_response, + redirect, + jsonify, + request, + url_for, +) -from stream import app, Brook as b +from stream import app +from models import Event as Events @app.route('/') def index(): - times = [] - events = b.query.order_by(b.time.desc()).limit(app.config['EVENT_AMOUNT']).all() - - for event in events: - event.info = json.loads(event.info) - times.append(event.time) - - response = make_response(render_template('stream.html', events=events)) - response.set_cookie('last_event', max(times)) + (events, last) = Events.latest() + response = make_response( + render_template('stream.html', events=events) + ) + response.set_cookie('last_event', last) return response @@ -30,27 +33,27 @@ def update(): last_event = datetime.strptime(last_event, '%Y-%m-%d %H:%M:%S') - events = b.query.order_by(b.time.desc()).filter(b.time > last_event).limit( - app.config['EVENT_AMOUNT']).all() + (events, last) = Events.since(last_event) - times = [] - info = {'events': []} + info = { + 'events': [] + } + info['amount'] = len(info['events']) for event in events: - times.append(event.time) - event.info = json.loads(event.info) info['events'].append({ - 'service': event.service, - 'html': render_template('services/{0}.html'.format(event.service), e=event) + 'service': event['service'], + 'html': render_template( + 'services/{0}.html'.format(event['service']), + e=event + ) }) - info['amount'] = len(info['events']) - response = make_response(jsonify(info)) response.headers['Cache-Control'] = 'no-cache' - if events == []: + if events == [] or last is None: return response else: - response.set_cookie('last_event', max(times)) + response.set_cookie('last_event', last) return response