Skip to content

Commit

Permalink
LogtailHandler: implement logging.Handler.flush (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
PetrHeinz authored Jun 19, 2024
1 parent 95ab74b commit 51e2c2f
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 25 deletions.
19 changes: 15 additions & 4 deletions logtail/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@


class FlushWorker(threading.Thread):
def __init__(self, upload, pipe, buffer_capacity, flush_interval):
def __init__(self, upload, pipe, buffer_capacity, flush_interval, check_interval):
threading.Thread.__init__(self)
self.parent_thread = threading.current_thread()
self.upload = upload
self.pipe = pipe
self.buffer_capacity = buffer_capacity
self.flush_interval = flush_interval
self.check_interval = check_interval
self.should_run = True
self._flushing = False
self._clean = True

def run(self):
while self.should_run:
Expand All @@ -27,6 +30,7 @@ def step(self):
last_flush = time.time()
time_remaining = _initial_time_remaining(self.flush_interval)
frame = []
self._clean = True

# If the parent thread has exited but there are still outstanding
# events, attempt to send them before exiting.
Expand All @@ -38,16 +42,17 @@ def step(self):
# `flush_interval` seconds have passed without sending any events.
while len(frame) < self.buffer_capacity and time_remaining > 0:
try:
# Blocks for up to 1.0 seconds for each item to prevent
# Blocks for up to `check_interval` seconds for each item to prevent
# spinning and burning CPU unnecessarily. Could block for the
# entire amount of `time_remaining` but then in the case that
# the parent thread has exited, that entire amount of time
# would be waited before this child worker thread exits.
entry = self.pipe.get(block=(not shutdown), timeout=1.0)
entry = self.pipe.get(block=(not shutdown), timeout=self.check_interval)
self._clean = False
frame.append(entry)
self.pipe.task_done()
except queue.Empty:
if shutdown:
if shutdown or self._flushing:
break
shutdown = not self.parent_thread.is_alive()
time_remaining = _calculate_time_remaining(last_flush, self.flush_interval)
Expand All @@ -68,9 +73,15 @@ def step(self):
if response.status_code == 500 and getattr(response, "exception") != None:
print('Failed to send logs to Better Stack after {} retries: {}'.format(len(RETRY_SCHEDULE), response.exception))

self._clean = True
if shutdown and self.pipe.empty():
self.should_run = False

def flush(self):
self._flushing = True
while not self._clean or not self.pipe.empty():
time.sleep(self.check_interval)
self._flushing = False

def _initial_time_remaining(flush_interval):
return flush_interval
Expand Down
10 changes: 9 additions & 1 deletion logtail/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DEFAULT_HOST = 'https://in.logs.betterstack.com'
DEFAULT_BUFFER_CAPACITY = 1000
DEFAULT_FLUSH_INTERVAL = 1
DEFAULT_CHECK_INTERVAL = 0.1
DEFAULT_RAISE_EXCEPTIONS = False
DEFAULT_DROP_EXTRA_EVENTS = True
DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True
Expand All @@ -23,6 +24,7 @@ def __init__(self,
host=DEFAULT_HOST,
buffer_capacity=DEFAULT_BUFFER_CAPACITY,
flush_interval=DEFAULT_FLUSH_INTERVAL,
check_interval=DEFAULT_CHECK_INTERVAL,
raise_exceptions=DEFAULT_RAISE_EXCEPTIONS,
drop_extra_events=DEFAULT_DROP_EXTRA_EVENTS,
include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES,
Expand All @@ -38,6 +40,7 @@ def __init__(self,
self.include_extra_attributes = include_extra_attributes
self.buffer_capacity = buffer_capacity
self.flush_interval = flush_interval
self.check_interval = check_interval
self.raise_exceptions = raise_exceptions
self.dropcount = 0
# Do not initialize the flush thread yet because it causes issues on Render.
Expand All @@ -51,7 +54,8 @@ def ensure_flush_thread_alive(self):
self.uploader,
self.pipe,
self.buffer_capacity,
self.flush_interval
self.flush_interval,
self.check_interval,
)
self.flush_thread.start()

Expand All @@ -71,3 +75,7 @@ def emit(self, record):
except Exception as e:
if self.raise_exceptions:
raise e

def flush(self):
if self.flush_thread and self.flush_thread.is_alive():
self.flush_thread.flush()
13 changes: 8 additions & 5 deletions tests/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import threading
import unittest

from unittest.mock import patch

from logtail.compat import queue
from logtail.flusher import RETRY_SCHEDULE
from logtail.flusher import FlushWorker
Expand All @@ -17,11 +19,12 @@ class TestFlushWorker(unittest.TestCase):
source_token = 'dummy_source_token'
buffer_capacity = 5
flush_interval = 2
check_interval = 0.01

def _setup_worker(self, uploader=None):
pipe = queue.Queue(maxsize=self.buffer_capacity)
uploader = uploader or Uploader(self.source_token, self.host)
fw = FlushWorker(uploader, pipe, self.buffer_capacity, self.flush_interval)
fw = FlushWorker(uploader, pipe, self.buffer_capacity, self.flush_interval, self.check_interval)
return pipe, uploader, fw

def test_is_thread(self):
Expand Down Expand Up @@ -50,7 +53,7 @@ def uploader(frame):

self.assertEqual(self.calls, 1)

@mock.patch('logtail.flusher._calculate_time_remaining')
@patch('logtail.flusher._calculate_time_remaining')
def test_flushes_after_interval(self, calculate_time_remaining):
self.buffer_capacity = 10
num_items = 2
Expand Down Expand Up @@ -82,8 +85,8 @@ def timeout(last_flush, interval):
self.assertEqual(self.upload_calls, 1)
self.assertEqual(self.timeout_calls, 2)

@mock.patch('logtail.flusher._calculate_time_remaining')
@mock.patch('logtail.flusher._initial_time_remaining')
@patch('logtail.flusher._calculate_time_remaining')
@patch('logtail.flusher._initial_time_remaining')
def test_does_nothing_without_any_items(self, initial_time_remaining, calculate_time_remaining):
calculate_time_remaining.side_effect = lambda a,b: 0.0
initial_time_remaining.side_effect = lambda a: 0.0001
Expand All @@ -95,7 +98,7 @@ def test_does_nothing_without_any_items(self, initial_time_remaining, calculate_
fw.step()
self.assertFalse(uploader.called)

@mock.patch('logtail.flusher.time.sleep')
@patch('logtail.flusher.time.sleep')
def test_retries_according_to_schedule(self, mock_sleep):
first_frame = list(range(self.buffer_capacity))

Expand Down
32 changes: 18 additions & 14 deletions tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
import unittest
import logging

from logtail import LogtailHandler, context
from unittest.mock import patch

from logtail import LogtailHandler, context
from logtail.handler import FlushWorker

class TestLogtailHandler(unittest.TestCase):
source_token = 'dummy_source_token'
host = 'dummy_host'

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_handler_creates_uploader_from_args(self, MockWorker):
handler = LogtailHandler(source_token=self.source_token, host=self.host)
self.assertEqual(handler.uploader.source_token, self.source_token)
self.assertEqual(handler.uploader.host, self.host)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_handler_creates_pipe_from_args(self, MockWorker):
buffer_capacity = 9
flush_interval = 1
Expand All @@ -30,11 +32,12 @@ def test_handler_creates_pipe_from_args(self, MockWorker):
)
self.assertTrue(handler.pipe.empty())

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockWorker):
buffer_capacity = 9
flush_interval = 9
handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval)
check_interval = 4
handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval, check_interval=check_interval)

self.assertFalse(MockWorker.called)

Expand All @@ -47,11 +50,12 @@ def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockW
handler.uploader,
handler.pipe,
buffer_capacity,
flush_interval
flush_interval,
check_interval,
)
self.assertEqual(handler.flush_thread.start.call_count, 1)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_emit_starts_thread_if_not_alive(self, MockWorker):
handler = LogtailHandler(source_token=self.source_token)

Expand All @@ -67,7 +71,7 @@ def test_emit_starts_thread_if_not_alive(self, MockWorker):

self.assertEqual(handler.flush_thread.start.call_count, 2)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_emit_drops_records_if_configured(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -87,7 +91,7 @@ def test_emit_drops_records_if_configured(self, MockWorker):
self.assertTrue(handler.pipe.empty())
self.assertEqual(handler.dropcount, 1)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_emit_does_not_drop_records_if_configured(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand Down Expand Up @@ -118,7 +122,7 @@ def consumer(q):

self.assertEqual(handler.dropcount, 0)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_error_suppression(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -139,7 +143,7 @@ def test_error_suppression(self, MockWorker):
handler.raise_exceptions = False
logger.critical('hello')

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_unserializable_extra_data(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -158,7 +162,7 @@ def test_can_send_unserializable_extra_data(self, MockWorker):
self.assertRegex(log_entry['data']['unserializable'], r'^<tests\.test_handler\.UnserializableObject object at 0x[0-f]+>$')
self.assertTrue(handler.pipe.empty())

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_unserializable_context(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -178,7 +182,7 @@ def test_can_send_unserializable_context(self, MockWorker):
self.assertRegex(log_entry['context']['data']['unserializable'], r'^<tests\.test_handler\.UnserializableObject object at 0x[0-f]+>$')
self.assertTrue(handler.pipe.empty())

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_circular_dependency_in_extra_data(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -200,7 +204,7 @@ def test_can_send_circular_dependency_in_extra_data(self, MockWorker):
self.assertTrue(handler.pipe.empty())


@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_circular_dependency_in_context(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import mock
import unittest

from unittest.mock import patch

from logtail.uploader import Uploader


Expand All @@ -12,7 +14,7 @@ class TestUploader(unittest.TestCase):
source_token = 'dummy_source_token'
frame = [1, 2, 3]

@mock.patch('logtail.uploader.requests.Session.post')
@patch('logtail.uploader.requests.Session.post')
def test_call(self, post):
def mock_post(endpoint, data=None, headers=None):
# Check that the data is sent to ther correct endpoint
Expand Down

0 comments on commit 51e2c2f

Please sign in to comment.