Skip to content

Commit

Permalink
feat: also log programs to S3 (hedyorg#469)
Browse files Browse the repository at this point in the history
To get rid of speed limits of jsonbin (and to stop having to pay for
it), also log parse log files to S3.

Factor out the `LogQueue` class and reuse it for this logging.

JSON Bin logging has not been turned off yet: it's safer to log to
both destinations in parallel until we're convinced that everythings
work the way we want it to.

Add a script called `tools/download-logs` which can be used to download
all programs from the S3 bucket (and add the AWS CLI dependency to
`requirements.txt`).
  • Loading branch information
rix0rrr authored Jun 12, 2021
1 parent 52a999c commit 59beb5b
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 181 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,6 @@ logs.txt
*.swo
.local
node_modules

# Ignore things that start with underscores
_*
10 changes: 6 additions & 4 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ def before_request_https():

Compress(app)
Commonmark(app)
logger = jsonbin.JsonBinLogger.from_env_vars()
querylog.LOG_QUEUE.set_transmitter(aws_helpers.s3_transmitter_from_env())
parse_logger = jsonbin.MultiParseLogger(
jsonbin.JsonBinLogger.from_env_vars(),
jsonbin.S3ParseLogger.from_env_vars())
querylog.LOG_QUEUE.set_transmitter(aws_helpers.s3_querylog_transmitter_from_env())

# Check that requested language is supported, otherwise return 404
@app.before_request
Expand Down Expand Up @@ -289,7 +291,7 @@ def parse():
print(f"error transpiling {code}")
response["Error"] = str(E)
querylog.log_value(server_error=response.get('Error'))
logger.log ({
parse_logger.log ({
'session': session_id(),
'date': str(datetime.datetime.now()),
'level': level,
Expand All @@ -308,7 +310,7 @@ def parse():
def report_error():
post_body = request.json

logger.log ({
parse_logger.log ({
'session': session_id(),
'date': str(datetime.datetime.now()),
'level': post_body.get('level'),
Expand Down
7 changes: 7 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@
'postfix': ('-' + dyno if dyno else '') + '-' + str(os.getpid()),
'region': 'eu-west-1'
},
's3-parse-logs': {
'bucket': 'hedy-parse-logs',
'prefix': app_name + '/',
# Make logs from different instances/processes unique
'postfix': ('-' + dyno if dyno else '') + '-' + str(os.getpid()),
'region': 'eu-west-1'
},
}
3 changes: 2 additions & 1 deletion gunicorn.conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
def worker_exit(server, worker):
# When the worker is being exited (perhaps because of a timeout),
# give the query_log handler a chance to flush to disk.
from website import querylog
from website import querylog, jsonbin
querylog.emergency_shutdown()
jsonbin.emergency_shutdown()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ requests==2.23.0
attrs==19.3.0
Flask-Commonmark==0.8
bcrypt==3.2.0
boto3==1.16.50
boto3>=1.16.50
ruamel.yaml==0.17.4
pylint==2.8.2
awscli>=1.19.88
4 changes: 2 additions & 2 deletions tests/test_querylog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from website import querylog
from website import querylog, log_queue
import unittest

class TestQueryLog(unittest.TestCase):
Expand All @@ -25,7 +25,7 @@ def test_emergency_recovery(self):

querylog.emergency_shutdown()

recovered_queue = querylog.LogQueue(batch_window_s=300)
recovered_queue = log_queue.LogQueue('querylog', batch_window_s=300)
recovered_queue.try_load_emergency_saves()
recovered_queue.set_transmitter(self._fake_transmitter)

Expand Down
2 changes: 1 addition & 1 deletion tools/view-logs → tools/download-logs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ sed -e '$a\' $cache_dir/* > $finalfile

echo "Now run a command like:"
echo ""
echo " cat $finalfile | recs grep '{{duration_ms}} > 10000' | recs totable -k start_time,duration_ms,method,path,'!_ms\$!' | less -S"
echo " cat $finalfile | recs grep '{{duration_ms}} > 1000' | tee _lastquery.jsonl | recs totable -k start_time,duration_ms,method,path,'!_ms\$!' | less -S"
60 changes: 60 additions & 0 deletions tools/download-programs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash
set -eu
scriptdir=$(cd $(dirname $0) && pwd)

if ! type aws > /dev/null; then
echo "Install the AWS CLI before running this script." >&2
exit 1
fi

if ! grep '\[hedy-logs-viewer\]' ~/.aws/credentials > /dev/null; then
echo "Add the following block to your ~/.aws/credentials file:" >&2
echo "" >&2
echo "[hedy-logs-viewer]" >&2
echo "aws_access_key_id = AKIA***********" >&2
echo "aws_secret_access_key = **************" >&2
echo ""
echo "(Ask someone from the team for the actual keys)" >&2
exit 1
fi

#----------------------------------------------------------------------
hedy_env=""
prefix=""

usage() {
echo "download-programs -e <ENVIRONMENT> [-d <YYYY-MM-DD>] <DIRECTORY>" >&2
}

while getopts "he:d:" OPTION
do
case $OPTION in
e)
hedy_env="$OPTARG"
;;
d)
prefix="/$OPTARG"
;;
h)
usage
exit 0
;;
esac
done
shift $((OPTIND -1))

dir="${1:-}"
if [[ "${dir}" == "" ]]; then
usage
exit 1
fi

#----------------------------------------------------------------------

mkdir -p "$dir"

export AWS_DEFAULT_REGION=eu-west-1
export AWS_PROFILE=hedy-logs-viewer
bucket=hedy-parse-logs

aws s3 sync s3://${bucket}/${hedy_env}/${prefix} $dir
50 changes: 32 additions & 18 deletions website/aws_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,48 @@
import config
import utils

def s3_transmitter_from_env():

def s3_querylog_transmitter_from_env():
"""Return an S3 transmitter, or return None."""
have_aws_creds = os.getenv('AWS_ACCESS_KEY_ID') and os.getenv('AWS_SECRET_ACCESS_KEY')

if not have_aws_creds:
logging.warning('Unable to initialize S3 querylogger (missing AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY)')
return None

return transmit_to_s3
return make_s3_transmitter(config.config['s3-query-logs'])


def transmit_to_s3(timestamp, records):
"""Transmit logfiles to S3 with default config."""
s3config = config.config['s3-query-logs']
def s3_parselog_transmitter_from_env():
"""Return an S3 transmitter, or return None."""
have_aws_creds = os.getenv('AWS_ACCESS_KEY_ID') and os.getenv('AWS_SECRET_ACCESS_KEY')

# No need to configure credentials, we've already confirmed they are in the environment.
s3 = boto3.client('s3', region_name=s3config['region'])
if not have_aws_creds:
logging.warning('Unable to initialize S3 parse logger (missing AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY)')
return None

# Grouping in the key is important, we need this to zoom into an interesting
# log period.
key = s3config['prefix'] + utils.isoformat(timestamp).replace('T', '/') + s3config['postfix'] + '.jsonl'
return make_s3_transmitter(config.config['s3-parse-logs'])

# Store as json-lines format
body = '\n'.join(json.dumps(r) for r in records)

s3.put_object(
Bucket=s3config['bucket'],
Key=key,
StorageClass='STANDARD_IA', # Cheaper, applicable for logs
Body=body)
logging.debug(f'Wrote {len(records)} query logs to s3://{s3config["bucket"]}/{key}')
def make_s3_transmitter(s3config):
"""Make a transmitter function (for use with a LogQueue) which will save records to S3."""
def transmit_to_s3(timestamp, records):
"""Transmit logfiles to S3 with default config."""

# No need to configure credentials, we've already confirmed they are in the environment.
s3 = boto3.client('s3', region_name=s3config['region'])

# Grouping in the key is important, we need this to zoom into an interesting
# log period.
key = s3config.get('prefix', '') + utils.isoformat(timestamp).replace('T', '/') + s3config.get('postfix', '') + '.jsonl'

# Store as json-lines format
body = '\n'.join(json.dumps(r) for r in records)

s3.put_object(
Bucket=s3config['bucket'],
Key=key,
StorageClass='STANDARD_IA', # Cheaper, applicable for logs
Body=body)
logging.debug(f'Wrote {len(records)} query logs to s3://{s3config["bucket"]}/{key}')
return transmit_to_s3
44 changes: 43 additions & 1 deletion website/jsonbin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import requests
import logging

from . import log_queue
from . import aws_helpers

logger = logging.getLogger('jsonbin')

class JsonBinLogger:
Expand Down Expand Up @@ -69,7 +72,46 @@ def _run(self):
except Exception:
logger.exception(f'Error posting to jsonbin.')


class NullJsonbinLogger():
"""A jsonbin logger that doesn't actually do anything."""
def log(self, obj):
pass
pass


class MultiParseLogger():
"""A logger that forwards to other loggers."""
def __init__(self, *loggers):
self.loggers = loggers

def log(self, obj):
for logger in self.loggers:
logger.log(obj)


class S3ParseLogger():
"""A logger that logs to S3.
- Well then why is it in a file called 'jsonbin.py'?
- Legacy, young grasshopper. Legacy.
"""
@staticmethod
def from_env_vars():
transmitter = aws_helpers.s3_parselog_transmitter_from_env()
if not transmitter:
return NullJsonbinLogger()

S3_LOG_QUEUE.set_transmitter(transmitter)
return S3ParseLogger()

def log(self, obj):
S3_LOG_QUEUE.add(obj)


S3_LOG_QUEUE = log_queue.LogQueue('parse', batch_window_s=300)
S3_LOG_QUEUE.try_load_emergency_saves()

def emergency_shutdown():
"""The process is being killed. Do whatever needs to be done to save the logs."""
S3_LOG_QUEUE.emergency_save_to_disk()
Loading

0 comments on commit 59beb5b

Please sign in to comment.