Skip to content
This repository has been archived by the owner on Feb 8, 2018. It is now read-only.

Commit

Permalink
Initial reimplementation based on change stream
Browse files Browse the repository at this point in the history
  • Loading branch information
chadwhitacre committed May 3, 2017
1 parent 8f6dac5 commit f7a148d
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 170 deletions.
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Quick Start
Local
-----

Given Python 2.7, Postgres 9.3, and a C/make toolchain:
Given Python 2.7, Postgres 9.6, and a C/make toolchain:

```shell
git clone https://github.com/gratipay/gratipay.com.git
Expand Down Expand Up @@ -116,7 +116,7 @@ On Debian or Ubuntu you will need the following packages:

```shell
sudo apt-get install \
postgresql-9.3 \
postgresql-9.6 \
postgresql-contrib \
libpq-dev \
python-dev \
Expand Down Expand Up @@ -386,7 +386,7 @@ Modifying the Database
======================

We write SQL, specifically the [PostgreSQL
variant](https://www.postgresql.org/docs/9.3/static/). We keep our database
variant](https://www.postgresql.org/docs/9.6/static/). We keep our database
schema in
[`schema.sql`](https://github.com/gratipay/gratipay.com/blob/master/sql/schema.sql),
and we write schema changes for each PR branch in a `sql/branch.sql` file, which
Expand Down Expand Up @@ -436,11 +436,10 @@ database configured in your testing environment.
Local Database Setup
--------------------
For the best development experience, you need a local
installation of [Postgres](https://www.postgresql.org/download/). The best
version of Postgres to use is 9.3.5, because that's what we're using in
production at Heroku. You need at least 9.2, because we depend on being able to
specify a URI to `psql`, and that was added in 9.2.
For the best development experience, you need a local installation of
[Postgres](https://www.postgresql.org/download/). The best version of Postgres
to use is 9.6.2, because that's what we're using in production at Heroku. You
need at least 9.5 to support the features we depend on.
+ Mac: use Homebrew: `brew install postgres`
+ Ubuntu: use Apt: `apt-get install postgresql postgresql-contrib libpq-dev`
Expand Down
74 changes: 54 additions & 20 deletions gratipay/cli/sync_npm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,74 @@
"""
from __future__ import absolute_import, division, print_function, unicode_literals

import sys
import argparse
from couchdb import Database

from gratipay import wireup
from gratipay.sync_npm import serialize, upsert
from gratipay.utils import sentry


def parse_args(argv):
p = argparse.ArgumentParser()
p.add_argument('command', choices=['serialize', 'upsert'])
p.add_argument('path', help='the path to the input file', nargs='?', default='/dev/stdin')
return p.parse_args(argv)
def production_change_stream(seq):
"""Given a sequence number in the npm registry change stream, start streaming!
"""
npm = Database('https://skimdb.npmjs.com/registry')
return npm.changes(feed='continuous', include_docs=True, since=seq)


def process_doc(doc):
"""Return a smoothed-out doc, or None if it's not a package doc, meaning
there's no name key and it's probably a design doc, per:
subcommands = { 'serialize': serialize.main
, 'upsert': upsert.main
}
https://github.com/npm/registry/blob/aef8a275/docs/follower.md#clean-up
"""
if 'name' not in doc:
return None
name = doc['name']
description = doc.get('description', '')
emails = [e for e in [m.get('email') for m in doc.get('maintainers', [])] if e.strip()]
return {'name': name, 'description': description, 'emails': sorted(set(emails))}


def main(argv=sys.argv):
def consume_change_stream(change_stream, db):
"""Given a function similar to :py:func:`production_change_stream` and a
:py:class:`~GratipayDB`, read from the stream and write to the db.
The npm registry is a CouchDB app, which means we get a change stream from
it that allows us to follow registry updates in near-realtime. Our strategy
here is to maintain open connections to both the registry and our own
database, and write as we read.
"""
last_seq = db.one('SELECT npm_last_seq FROM worker_coordination')
with db.get_connection() as conn:
for change in change_stream(last_seq):
processed = process_doc(change['doc'])
if not processed:
continue
cursor = conn.cursor()
cursor.run('''
INSERT INTO packages
(package_manager, name, description, emails)
VALUES ('npm', %(name)s, %(description)s, %(emails)s)
ON CONFLICT (package_manager, name) DO UPDATE
SET description=%(description)s, emails=%(emails)s
''', processed)
cursor.run('UPDATE worker_coordination SET npm_last_seq=%s', (change['seq'],))
cursor.connection.commit()


def main():
"""This function is installed via an entrypoint in ``setup.py`` as
``sync-npm``.
Usage::
sync-npm {serialize,upsert} {<filepath>}
``<filepath>`` defaults to stdin.
.. note:: Sphinx is expanding ``sys.argv`` in the parameter list. Sorry. :-/
sync-npm
"""
env = wireup.env()
args = parse_args(argv[1:])
db = wireup.db(env)

subcommands[args.command](env, args, db)
while 1:
with sentry.teller(env):
consume_change_stream(db, production_change_stream)
57 changes: 0 additions & 57 deletions gratipay/sync_npm/upsert.py

This file was deleted.

26 changes: 26 additions & 0 deletions gratipay/utils/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals

import sys

from gratipay import wireup


class teller(object):
"""This is a context manager to log to Sentry. You have to pass in an
``Environment`` object with a ``sentry_dsn`` attribute.
"""

def __init__(self, env, noop=None):
try:
sys.stdout = sys.stderr # work around aspen.log_dammit limitation; sigh
self.tell_sentry = wireup.make_sentry_teller(env, noop)
finally:
sys.stdout = sys.__stdout__

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.tell_sentry(exc_type, {})
return False
4 changes: 4 additions & 0 deletions sql/branch.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BEGIN;
CREATE TABLE worker_coordination (npm_last_seq bigint not null default -1);
INSERT INTO worker_coordination DEFAULT VALUES;
END;
120 changes: 36 additions & 84 deletions tests/py/test_sync_npm.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,60 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals

from subprocess import Popen, PIPE
from gratipay.testing import Harness

import pytest
from gratipay.cli import sync_npm

from gratipay import sync_npm
from gratipay.testing import Harness

class ProcessDocTests(Harness):

def load(raw):
serialized = Popen( ('env/bin/sync-npm', 'serialize', '/dev/stdin')
, stdin=PIPE, stdout=PIPE
).communicate(raw)[0]
Popen( ('env/bin/sync-npm', 'upsert', '/dev/stdin')
, stdin=PIPE, stdout=PIPE
).communicate(serialized)[0]
def test_returns_None_if_no_name(self):
assert sync_npm.process_doc({}) is None

def test_backfills_missing_keys(self):
actual = sync_npm.process_doc({'name': 'foo'})
assert actual == {'name': 'foo', 'description': '', 'emails': []}

class FailCollector:
def test_extracts_maintainer_emails(self):
doc = {'name': 'foo', 'maintainers': [{'email': '[email protected]'}]}
assert sync_npm.process_doc(doc)['emails'] == ['[email protected]']

def __init__(self):
self.fails = []
def test_skips_empty_emails(self):
doc = {'name': 'foo', 'maintainers': [{'email': ''}, {'email': ' '}]}
assert sync_npm.process_doc(doc)['emails'] == []

def __call__(self, fail, whatever):
self.fails.append(fail)
def test_sorts_emails(self):
doc = {'name': 'foo', 'maintainers': [{'email': 'bob'}, {'email': 'alice'}]}
assert sync_npm.process_doc(doc)['emails'] == ['alice', 'bob']

def test_dedupes_emails(self):
doc = {'name': 'foo', 'maintainers': [{'email': 'alice'}, {'email': 'alice'}]}
assert sync_npm.process_doc(doc)['emails'] == ['alice']

class Heck(Exception):
pass

class ConsumeChangeStreamTests(Harness):

def change_stream(self, docs):
def change_stream(seq):
for i, doc in enumerate(docs):
if i < seq: continue
yield {'seq': i, 'doc': doc}
return change_stream

class Tests(Harness):

def test_packages_starts_empty(self):
assert self.db.all('select * from packages') == []


# sn - sync-npm

def test_sn_inserts_packages(self):
load(br'''
{ "_updated": 1234567890
, "testing-package":
{ "name":"testing-package"
, "description":"A package for testing"
, "maintainers":[{"email":"[email protected]"}]
, "author": {"email":"[email protected]"}
, "time":{"modified":"2015-09-12T03:03:03.135Z"}
}
}
''')

package = self.db.one('select * from packages')
assert package.package_manager == 'npm'
assert package.name == 'testing-package'
assert package.description == 'A package for testing'
assert package.name == 'testing-package'


def test_sn_handles_quoting(self):
load(br'''
{ "_updated": 1234567890
, "testi\\\"ng-pa\\\"ckage":
{ "name":"testi\\\"ng-pa\\\"ckage"
, "description":"A package for \"testing\""
, "maintainers":[{"email":"alice@\"example\".com"}]
, "author": {"email":"\\\\\"bob\\\\\"@example.com"}
, "time":{"modified":"2015-09-12T03:03:03.135Z"}
}
}
''')
def test_consumes_change_stream(self):
docs = [ {'name': 'foo', 'description': 'Foo.'}
, {'name': 'foo', 'description': 'Foo?'}
, {'name': 'foo', 'description': 'Foo!'}
]
sync_npm.consume_change_stream(self.change_stream(docs), self.db)

package = self.db.one('select * from packages')
assert package.package_manager == 'npm'
assert package.name == r'testi\"ng-pa\"ckage'
assert package.description == 'A package for "testing"'
assert package.emails == ['alice@"example".com', r'\\"bob\\"@example.com']


def test_sn_handles_empty_description_and_emails(self):
load(br'''
{ "_updated": 1234567890
, "empty-description":
{ "name":"empty-description"
, "description":""
, "time":{"modified":"2015-09-12T03:03:03.135Z"}
}
}
''')

package = self.db.one('select * from packages')
assert package.package_manager == 'npm'
assert package.name == 'empty-description'
assert package.description == ''
assert package.name == 'foo'
assert package.description == 'Foo!'
assert package.emails == []


# with sentry(env)

def test_with_sentry_logs_to_sentry_and_raises(self):
class env: sentry_dsn = ''
noop = FailCollector()
with pytest.raises(Heck):
with sync_npm.sentry(env, noop):
raise Heck
assert noop.fails == [Heck]
Loading

0 comments on commit f7a148d

Please sign in to comment.