diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..f95b238 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,48 @@ +name: Run test + +on: + push: + branches: + - master + pull_request: + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Ruff + run: pipx install ruff + - name: Ruff check + run: ruff check + - name: Ruff format + run: ruff format --diff + + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "pypy3.9", "pypy3.10"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + cache-dependency-path: requirements-dev.txt + - name: Install dependencies + run: python -m pip install -r requirements-dev.txt + - name: Run tests + run: pytest --cov=fluent + + build: + needs: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: pipx run build + - uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/ diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 0e5e0c8..0000000 --- a/.travis.yml +++ /dev/null @@ -1,33 +0,0 @@ -sudo: false -language: python -python: - - "3.5" - - "3.6" - - "3.7" - - "3.8" - - "3.9" - - pypy3 - - nightly -# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors -install: - - "pip install -e ." - - "pip install 'coverage~=4.5.4' coveralls" -script: - - "PYTHONFAULTHANDLER=x timeout -sABRT 30s nosetests -vsd" -after_success: - - coveralls - -deploy: - provider: pypi - user: repeatedly - server: https://upload.pypi.org/legacy/ - password: - secure: CpNaj4F3TZvpP1aSJWidh/XexrWODV2sBdObrYU79Gyh9hFl6WLsA3JM9BfVsy9cGb/P/jP6ly4Z0/6qdIzZ5D6FPOB1B7rn5GZ2LAMOypRCA6W2uJbRjUU373Wut0p0OmQcMPto6XJsMlpvOEq+1uAq+LLAnAGEmmYTeskZebs= - on: - tags: true - condition: '"$TRAVIS_PYTHON_VERSION" = "3.9" || "$TRAVIS_PYTHON_VERSION" = "2.7"' - distributions: "sdist bdist_wheel" - -matrix: - allow_failures: - - python: nightly diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 3248bb2..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,4 +0,0 @@ -include README.rst -include setup.py -include COPYING -include test/*py diff --git a/README.rst b/README.rst index 5a31463..30b499f 100644 --- a/README.rst +++ b/README.rst @@ -1,20 +1,12 @@ -A Python structured logger for Fluentd -====================================== - -.. image:: https://travis-ci.org/fluent/fluent-logger-python.svg?branch=master - :target: https://travis-ci.org/fluent/fluent-logger-python - :alt: Build Status - -.. image:: https://coveralls.io/repos/fluent/fluent-logger-python/badge.svg - :target: https://coveralls.io/r/fluent/fluent-logger-python - :alt: Coverage Status +A Python structured logger for Fluentd/Fluent Bit +================================================= Many web/mobile applications generate huge amount of event logs (c,f. login, logout, purchase, follow, etc). To analyze these event logs could be really valuable for improving the service. However, the challenge is collecting these logs easily and reliably. -`Fluentd `__ solves that problem by +`Fluentd `__ and `Fluent Bit `__ solves that problem by having: easy installation, small footprint, plugins, reliable buffering, log forwarding, etc. @@ -24,10 +16,11 @@ Python application. Requirements ------------ -- Python 3.5+ +- Python 3.7+ - ``msgpack`` - **IMPORTANT**: Version 0.8.0 is the last version supporting Python 2.6, 3.2 and 3.3 - **IMPORTANT**: Version 0.9.6 is the last version supporting Python 2.7 and 3.4 +- **IMPORTANT**: Version 0.10.0 is the last version supporting Python 3.5 and 3.6 Installation ------------ @@ -366,23 +359,22 @@ that this doesn't happen, or it's acceptable for your application. Testing ------- -Testing can be done using -`nose `__. - -Release -------- - -Need wheel package. +Testing can be done using `pytest `__. .. code:: sh - $ pip install wheel + $ pytest tests -After that, type following command: + +Release +------- .. code:: sh - $ python setup.py clean sdist bdist_wheel upload + $ # Download dist.zip for release from GitHub Action artifact. + $ unzip -d dist dist.zip + $ pipx twine upload dist/* + Contributors ------------ diff --git a/fluent/__about__.py b/fluent/__about__.py new file mode 100644 index 0000000..1cd2317 --- /dev/null +++ b/fluent/__about__.py @@ -0,0 +1 @@ +__version__ = "0.10.1dev1" diff --git a/fluent/asynchandler.py b/fluent/asynchandler.py index bbba4c4..e150383 100644 --- a/fluent/asynchandler.py +++ b/fluent/asynchandler.py @@ -1,13 +1,11 @@ -# -*- coding: utf-8 -*- - from fluent import asyncsender from fluent import handler class FluentHandler(handler.FluentHandler): - ''' + """ Asynchronous Logging Handler for fluent. - ''' + """ def getSenderClass(self): return asyncsender.FluentSender @@ -18,7 +16,7 @@ def close(self): try: self.sender.close() finally: - super(FluentHandler, self).close() + super().close() finally: self.release() diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 24c6924..73a1e61 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - import threading from queue import Queue, Full, Empty @@ -17,8 +15,7 @@ def _set_global_sender(sender): # pragma: no cover - """ [For testing] Function to set global sender directly - """ + """[For testing] Function to set global sender directly""" global _global_sender _global_sender = sender @@ -37,28 +34,37 @@ def close(): # pragma: no cover class FluentSender(sender.FluentSender): - def __init__(self, - tag, - host='localhost', - port=24224, - bufmax=1 * 1024 * 1024, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - nanosecond_precision=False, - msgpack_kwargs=None, - queue_maxsize=DEFAULT_QUEUE_MAXSIZE, - queue_circular=DEFAULT_QUEUE_CIRCULAR, - queue_overflow_handler=None, - **kwargs): + def __init__( + self, + tag, + host="localhost", + port=24224, + bufmax=1 * 1024 * 1024, + timeout=3.0, + verbose=False, + buffer_overflow_handler=None, + nanosecond_precision=False, + msgpack_kwargs=None, + queue_maxsize=DEFAULT_QUEUE_MAXSIZE, + queue_circular=DEFAULT_QUEUE_CIRCULAR, + queue_overflow_handler=None, + **kwargs, + ): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. """ - super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, - verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, - nanosecond_precision=nanosecond_precision, - msgpack_kwargs=msgpack_kwargs, - **kwargs) + super().__init__( + tag=tag, + host=host, + port=port, + bufmax=bufmax, + timeout=timeout, + verbose=verbose, + buffer_overflow_handler=buffer_overflow_handler, + nanosecond_precision=nanosecond_precision, + msgpack_kwargs=msgpack_kwargs, + **kwargs, + ) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular if queue_circular and queue_overflow_handler: @@ -66,12 +72,15 @@ def __init__(self, else: self._queue_overflow_handler = self._queue_overflow_handler_default - self._thread_guard = threading.Event() # This ensures visibility across all variables + self._thread_guard = ( + threading.Event() + ) # This ensures visibility across all variables self._closed = False self._queue = Queue(maxsize=queue_maxsize) - self._send_thread = threading.Thread(target=self._send_loop, - name="AsyncFluentSender %d" % id(self)) + self._send_thread = threading.Thread( + target=self._send_loop, name="AsyncFluentSender %d" % id(self) + ) self._send_thread.daemon = True self._send_thread.start() @@ -121,7 +130,7 @@ def _send(self, bytes_): return True def _send_loop(self): - send_internal = super(FluentSender, self)._send_internal + send_internal = super()._send_internal try: while True: diff --git a/fluent/event.py b/fluent/event.py index 76f27ca..c69e537 100644 --- a/fluent/event.py +++ b/fluent/event.py @@ -1,13 +1,11 @@ -# -*- coding: utf-8 -*- - import time from fluent import sender -class Event(object): +class Event: def __init__(self, label, data, **kwargs): - assert isinstance(data, dict), 'data must be a dict' - sender_ = kwargs.get('sender', sender.get_global_sender()) - timestamp = kwargs.get('time', int(time.time())) + assert isinstance(data, dict), "data must be a dict" + sender_ = kwargs.get("sender", sender.get_global_sender()) + timestamp = kwargs.get("time", int(time.time())) sender_.emit_with_time(label, timestamp, data) diff --git a/fluent/handler.py b/fluent/handler.py index 7aefd8f..2bc42b4 100644 --- a/fluent/handler.py +++ b/fluent/handler.py @@ -1,19 +1,12 @@ -# -*- coding: utf-8 -*- - +import json import logging import socket -import sys - -try: - import simplejson as json -except ImportError: # pragma: no cover - import json from fluent import sender -class FluentRecordFormatter(logging.Formatter, object): - """ A structured formatter for Fluent. +class FluentRecordFormatter(logging.Formatter): + """A structured formatter for Fluent. Best used with server storing data in an ElasticSearch cluster for example. @@ -33,36 +26,49 @@ class FluentRecordFormatter(logging.Formatter, object): Can be an iterable. """ - def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False, format_json=True, - exclude_attrs=None): - super(FluentRecordFormatter, self).__init__(None, datefmt) - - if sys.version_info[0:2] >= (3, 2) and style != '%': + def __init__( + self, + fmt=None, + datefmt=None, + style="%", + fill_missing_fmt_key=False, + format_json=True, + exclude_attrs=None, + ): + super().__init__(None, datefmt) + + if style != "%": self.__style, basic_fmt_dict = { - '{': (logging.StrFormatStyle, { - 'sys_host': '{hostname}', - 'sys_name': '{name}', - 'sys_module': '{module}', - }), - '$': (logging.StringTemplateStyle, { - 'sys_host': '${hostname}', - 'sys_name': '${name}', - 'sys_module': '${module}', - }), + "{": ( + logging.StrFormatStyle, + { + "sys_host": "{hostname}", + "sys_name": "{name}", + "sys_module": "{module}", + }, + ), + "$": ( + logging.StringTemplateStyle, + { + "sys_host": "${hostname}", + "sys_name": "${name}", + "sys_module": "${module}", + }, + ), }[style] else: self.__style = None basic_fmt_dict = { - 'sys_host': '%(hostname)s', - 'sys_name': '%(name)s', - 'sys_module': '%(module)s', + "sys_host": "%(hostname)s", + "sys_name": "%(name)s", + "sys_module": "%(module)s", } if exclude_attrs is not None: self._exc_attrs = set(exclude_attrs) self._fmt_dict = None self._formatter = self._format_by_exclusion - self.usesTime = super(FluentRecordFormatter, self).usesTime + self.usesTime = super().usesTime else: self._exc_attrs = None if not fmt: @@ -89,7 +95,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False def format(self, record): # Compute attributes handled by parent class. - super(FluentRecordFormatter, self).format(record) + super().format(record) # Add ours record.hostname = self.hostname @@ -103,7 +109,7 @@ def usesTime(self): """This method is substituted on construction based on settings for performance reasons""" def _structuring(self, data, record): - """ Melds `msg` into `data`. + """Melds `msg` into `data`. :param data: dictionary to be sent to fluent server :param msg: :class:`LogRecord`'s message to add to `data`. @@ -118,7 +124,7 @@ def _structuring(self, data, record): elif isinstance(msg, str): self._add_dic(data, self._format_msg(record, msg)) else: - self._add_dic(data, {'message': msg}) + self._add_dic(data, {"message": msg}) def _format_msg_json(self, record, msg): try: @@ -131,7 +137,7 @@ def _format_msg_json(self, record, msg): return self._format_msg_default(record, msg) def _format_msg_default(self, record, msg): - return {'message': super(FluentRecordFormatter, self).format(record)} + return {"message": super().format(record)} def _format_by_exclusion(self, record): data = {} @@ -175,17 +181,18 @@ class FluentHandler(logging.Handler): Logging Handler for fluent. """ - def __init__(self, - tag, - host='localhost', - port=24224, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - msgpack_kwargs=None, - nanosecond_precision=False, - **kwargs): - + def __init__( + self, + tag, + host="localhost", + port=24224, + timeout=3.0, + verbose=False, + buffer_overflow_handler=None, + msgpack_kwargs=None, + nanosecond_precision=False, + **kwargs, + ): self.tag = tag self._host = host self._port = port @@ -213,29 +220,45 @@ def sender(self): buffer_overflow_handler=self._buffer_overflow_handler, msgpack_kwargs=self._msgpack_kwargs, nanosecond_precision=self._nanosecond_precision, - **self._kwargs + **self._kwargs, ) return self._sender - def getSenderInstance(self, tag, host, port, timeout, verbose, - buffer_overflow_handler, msgpack_kwargs, - nanosecond_precision, **kwargs): + def getSenderInstance( + self, + tag, + host, + port, + timeout, + verbose, + buffer_overflow_handler, + msgpack_kwargs, + nanosecond_precision, + **kwargs, + ): sender_class = self.getSenderClass() - return sender_class(tag, - host=host, port=port, - timeout=timeout, verbose=verbose, - buffer_overflow_handler=buffer_overflow_handler, - msgpack_kwargs=msgpack_kwargs, - nanosecond_precision=nanosecond_precision, **kwargs) + return sender_class( + tag, + host=host, + port=port, + timeout=timeout, + verbose=verbose, + buffer_overflow_handler=buffer_overflow_handler, + msgpack_kwargs=msgpack_kwargs, + nanosecond_precision=nanosecond_precision, + **kwargs, + ) def emit(self, record): data = self.format(record) _sender = self.sender - return _sender.emit_with_time(None, - sender.EventTime(record.created) - if _sender.nanosecond_precision - else int(record.created), - data) + return _sender.emit_with_time( + None, + sender.EventTime(record.created) + if _sender.nanosecond_precision + else int(record.created), + data, + ) def close(self): self.acquire() @@ -243,7 +266,7 @@ def close(self): try: self.sender.close() finally: - super(FluentHandler, self).close() + super().close() finally: self.release() diff --git a/fluent/sender.py b/fluent/sender.py index dba5907..34a6a81 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - import errno import socket import struct @@ -13,8 +11,7 @@ def _set_global_sender(sender): # pragma: no cover - """ [For testing] Function to set global sender directly - """ + """[For testing] Function to set global sender directly""" global _global_sender _global_sender = sender @@ -35,27 +32,29 @@ def close(): # pragma: no cover class EventTime(msgpack.ExtType): def __new__(cls, timestamp): seconds = int(timestamp) - nanoseconds = int(timestamp % 1 * 10 ** 9) - return super(EventTime, cls).__new__( + nanoseconds = int(timestamp % 1 * 10**9) + return super().__new__( cls, code=0, data=struct.pack(">II", seconds, nanoseconds), ) -class FluentSender(object): - def __init__(self, - tag, - host='localhost', - port=24224, - bufmax=1 * 1024 * 1024, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - nanosecond_precision=False, - forward_packet_error=True, - msgpack_kwargs=None, - **kwargs): +class FluentSender: + def __init__( + self, + tag, + host="localhost", + port=24224, + bufmax=1 * 1024 * 1024, + timeout=3.0, + verbose=False, + buffer_overflow_handler=None, + nanosecond_precision=False, + forward_packet_error=True, + msgpack_kwargs=None, + **kwargs, + ): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. """ @@ -90,23 +89,28 @@ def emit_with_time(self, label, timestamp, data): if not self.forward_packet_error: raise self.last_error = e - bytes_ = self._make_packet(label, timestamp, - {"level": "CRITICAL", - "message": "Can't output to log", - "traceback": traceback.format_exc()}) + bytes_ = self._make_packet( + label, + timestamp, + { + "level": "CRITICAL", + "message": "Can't output to log", + "traceback": traceback.format_exc(), + }, + ) return self._send(bytes_) @property def last_error(self): - return getattr(self._last_error_threadlocal, 'exception', None) + return getattr(self._last_error_threadlocal, "exception", None) @last_error.setter def last_error(self, err): self._last_error_threadlocal.exception = err def clear_last_error(self, _thread_id=None): - if hasattr(self._last_error_threadlocal, 'exception'): - delattr(self._last_error_threadlocal, 'exception') + if hasattr(self._last_error_threadlocal, "exception"): + delattr(self._last_error_threadlocal, "exception") def close(self): with self.lock: @@ -124,7 +128,7 @@ def close(self): def _make_packet(self, label, timestamp, data): if label: - tag = '.'.join((self.tag, label)) if self.tag else label + tag = ".".join((self.tag, label)) if self.tag else label else: tag = self.tag if self.nanosecond_precision and isinstance(timestamp, float): @@ -153,7 +157,7 @@ def _send_internal(self, bytes_): self.pendings = None return True - except socket.error as e: + except OSError as e: self.last_error = e # close socket @@ -173,13 +177,13 @@ def _check_recv_side(self): self.socket.settimeout(0.0) try: recvd = self.socket.recv(4096) - except socket.error as recv_e: + except OSError as recv_e: if recv_e.errno != errno.EWOULDBLOCK: raise return - if recvd == b'': - raise socket.error(errno.EPIPE, "Broken pipe") + if recvd == b"": + raise OSError(errno.EPIPE, "Broken pipe") finally: self.socket.settimeout(self.timeout) @@ -193,17 +197,17 @@ def _send_data(self, bytes_): while bytes_sent < bytes_to_send: sent = self.socket.send(bytes_[bytes_sent:]) if sent == 0: - raise socket.error(errno.EPIPE, "Broken pipe") + raise OSError(errno.EPIPE, "Broken pipe") bytes_sent += sent self._check_recv_side() def _reconnect(self): if not self.socket: try: - if self.host.startswith('unix://'): + if self.host.startswith("unix://"): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) - sock.connect(self.host[len('unix://'):]) + sock.connect(self.host[len("unix://") :]) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) @@ -223,7 +227,7 @@ def _call_buffer_overflow_handler(self, pending_events): try: if self.buffer_overflow_handler: self.buffer_overflow_handler(pending_events) - except Exception as e: + except Exception: # User should care any exception in handler pass @@ -234,12 +238,12 @@ def _close(self): try: try: sock.shutdown(socket.SHUT_RDWR) - except socket.error: # pragma: no cover + except OSError: # pragma: no cover pass finally: try: sock.close() - except socket.error: # pragma: no cover + except OSError: # pragma: no cover pass finally: self.socket = None diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8140e03 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,54 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "fluent-logger" +dynamic = ["version"] +description = "A Python logging handler for Fluentd event collector" +readme = "README.rst" +license = { file = "COPYING" } +requires-python = ">=3.7" +authors = [ + { name = "Kazuki Ohta", email = "kazuki.ohta@gmail.com" }, +] +maintainers = [ + { name = "Arcadiy Ivanov", email = "arcadiy@ivanov.biz" }, + { name = "Inada Naoki", email = "songofacandy@gmail.com" }, +] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Topic :: System :: Logging", +] +dependencies = [ + "msgpack>=1.0", +] + +[project.urls] +Download = "https://pypi.org/project/fluent-logger/" +Homepage = "https://github.com/fluent/fluent-logger-python" + +[tool.hatch.version] +path = "fluent/__about__.py" + +[tool.hatch.build.targets.sdist] +exclude = [ + "/.github", + "/.tox", + "/.venv", +] + +[tool.hatch.build.targets.wheel] +include = [ + "/fluent", +] diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..3707664 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +pytest +pytest-cov +msgpack diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index b8f2760..0000000 --- a/setup.cfg +++ /dev/null @@ -1,10 +0,0 @@ -[nosetests] -match = ^test_ -cover-package = fluent -with-coverage = 1 -cover-erase = 1 -cover-branches = 1 -cover-inclusive = 1 -cover-min-percentage = 70 -[bdist_wheel] -universal = 1 diff --git a/setup.py b/setup.py deleted file mode 100755 index 1453d55..0000000 --- a/setup.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/python - -from os import path - -try: - from setuptools import setup -except ImportError: - from distutils.core import setup - -README = path.abspath(path.join(path.dirname(__file__), 'README.rst')) -desc = 'A Python logging handler for Fluentd event collector' - -setup( - name='fluent-logger', - version='0.10.0', - description=desc, - long_description=open(README).read(), - package_dir={'fluent': 'fluent'}, - packages=['fluent'], - install_requires=['msgpack>1.0'], - author='Kazuki Ohta', - author_email='kazuki.ohta@gmail.com', - maintainer='Arcadiy Ivanov', - maintainer_email='arcadiy@ivanov.biz', - url='https://github.com/fluent/fluent-logger-python', - download_url='https://pypi.org/project/fluent-logger/', - license='Apache License, Version 2.0', - classifiers=[ - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: Implementation :: CPython', - 'Programming Language :: Python :: Implementation :: PyPy', - 'Development Status :: 5 - Production/Stable', - 'Topic :: System :: Logging', - 'Intended Audience :: Developers', - ], - python_requires='>=3.5', - test_suite='tests' -) diff --git a/tests/mockserver.py b/tests/mockserver.py index 77ecdd3..6ea2fff 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - try: from cStringIO import StringIO as BytesIO except ImportError: @@ -16,13 +14,13 @@ class MockRecvServer(threading.Thread): Single threaded server accepts one connection and recv until EOF. """ - def __init__(self, host='localhost', port=0): - super(MockRecvServer, self).__init__() + def __init__(self, host="localhost", port=0): + super().__init__() - if host.startswith('unix://'): + if host.startswith("unix://"): self.socket_proto = socket.AF_UNIX self.socket_type = socket.SOCK_STREAM - self.socket_addr = host[len('unix://'):] + self.socket_addr = host[len("unix://") :] else: self.socket_proto = socket.AF_INET self.socket_type = socket.SOCK_STREAM @@ -55,7 +53,7 @@ def run(self): if not data: break self._buf.write(data) - except socket.error as e: + except OSError as e: print("MockServer error: %s" % e) break finally: @@ -69,15 +67,13 @@ def get_received(self): return list(Unpacker(self._buf)) def close(self): - try: self._sock.close() except Exception: pass try: - conn = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn.connect((self.socket_addr[0], self.port)) finally: @@ -92,6 +88,3 @@ def close(self): pass self.join() - - def __del__(self): - self.close() diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index e88a041..7bbf108 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -1,18 +1,14 @@ -#  -*- coding: utf-8 -*- - import logging -import sys import unittest try: from unittest import mock except ImportError: - import mock + from unittest import mock try: from unittest.mock import patch except ImportError: - from mock import patch - + from unittest.mock import patch import fluent.asynchandler @@ -20,10 +16,16 @@ from tests import mockserver +def get_logger(name, level=logging.INFO): + logger = logging.getLogger(name) + logger.setLevel(level) + return logger + + class TestHandler(unittest.TestCase): def setUp(self): - super(TestHandler, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -37,246 +39,233 @@ def get_data(self): return self._server.get_received() def test_simple(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({ - 'from': 'userA', - 'to': 'userB' - }) + log.info({"from": "userA", "to": "userB"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('app.follow', data[0][0]) - eq('userA', data[0][2]['from']) - eq('userB', data[0][2]['to']) + eq("app.follow", data[0][0]) + eq("userA", data[0][2]["from"]) + eq("userB", data[0][2]["to"]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_custom_fmt(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'lineno': '%(lineno)d', - 'emitted_at': '%(asctime)s', - }) + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "%(name)s", + "lineno": "%(lineno)d", + "emitted_at": "%(asctime)s", + } + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_format_style(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '{name}', - 'lineno': '{lineno}', - 'emitted_at': '{asctime}', - }, style='{') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "{name}", + "lineno": "{lineno}", + "emitted_at": "{asctime}", + }, + style="{", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_template_style(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '${name}', - 'lineno': '${lineno}', - 'emitted_at': '${asctime}', - }, style='$') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "${name}", + "lineno": "${lineno}", + "emitted_at": "${asctime}", + }, + style="$", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) def test_custom_field_raise_exception(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }) + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"} + ) ) log.addHandler(handler) with self.assertRaises(KeyError): - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) def test_custom_field_fill_missing_fmt_key_is_true(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }, - fill_missing_fmt_key=True + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"}, + fill_missing_fmt_key=True, ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('custom_field' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("custom_field" in data[0][2]) # field defaults to none if not in log record - self.assertIsNone(data[0][2]['custom_field']) + self.assertIsNone(data[0][2]["custom_field"]) def test_json_encoded_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info('{"key": "hello world!", "param": "value"}') data = self.get_data() - self.assertTrue('key' in data[0][2]) - self.assertEqual('hello world!', data[0][2]['key']) + self.assertTrue("key" in data[0][2]) + self.assertEqual("hello world!", data[0][2]["key"]) def test_unstructured_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello %s', 'world') + log.info("hello %s", "world") data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world", data[0][2]["message"]) def test_unstructured_formatted_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello world, %s', 'you!') + log.info("hello world, %s", "you!") data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world, you!', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world, you!", data[0][2]["message"]) def test_number_string_simple_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info("1") data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_simple_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info(42) data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_dict_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({42: 'root'}) + log.info({42: "root"}) data = self.get_data() # For some reason, non-string keys are ignored self.assertFalse(42 in data[0][2]) def test_exception_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) try: - raise Exception('sample exception') + raise Exception("sample exception") except Exception: - log.exception('it failed') + log.exception("it failed") data = self.get_data() - message = data[0][2]['message'] + message = data[0][2]["message"] # Includes the logged message, as well as the stack trace. - self.assertTrue('it failed' in message) + self.assertTrue("it failed" in message) self.assertTrue('tests/test_asynchandler.py", line' in message) - self.assertTrue('Exception: sample exception' in message) + self.assertTrue("Exception: sample exception" in message) class TestHandlerWithCircularQueue(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestHandlerWithCircularQueue, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -290,22 +279,24 @@ def get_data(self): return self._server.get_received() def test_simple(self): - handler = self.get_handler_class()('app.follow', port=self._port, - queue_maxsize=self.Q_SIZE, - queue_circular=True) + handler = self.get_handler_class()( + "app.follow", + port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + ) with handler: self.assertEqual(handler.sender.queue_circular, True) self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 1, "from": "userA", "to": "userB"}) + log.info({"cnt": 2, "from": "userA", "to": "userB"}) + log.info({"cnt": 3, "from": "userA", "to": "userB"}) + log.info({"cnt": 4, "from": "userA", "to": "userB"}) + log.info({"cnt": 5, "from": "userA", "to": "userB"}) data = self.get_data() eq = self.assertEqual @@ -315,9 +306,9 @@ def test_simple(self): el = data[0] eq(3, len(el)) - eq('app.follow', el[0]) - eq('userA', el[2]['from']) - eq('userB', el[2]['to']) + eq("app.follow", el[0]) + eq("userA", el[2]["from"]) + eq("userB", el[2]["to"]) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) @@ -334,8 +325,8 @@ class TestHandlerWithCircularQueueHandler(unittest.TestCase): Q_SIZE = 1 def setUp(self): - super(TestHandlerWithCircularQueueHandler, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -346,42 +337,49 @@ def get_handler_class(self): return fluent.asynchandler.FluentHandler def test_simple(self): - handler = self.get_handler_class()('app.follow', port=self._port, - queue_maxsize=self.Q_SIZE, - queue_circular=True, - queue_overflow_handler=queue_overflow_handler) + handler = self.get_handler_class()( + "app.follow", + port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + queue_overflow_handler=queue_overflow_handler, + ) with handler: + def custom_full_queue(): - handler.sender._queue.put(b'Mock', block=True) + handler.sender._queue.put(b"Mock", block=True) return True - with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)): + with patch.object( + fluent.asynchandler.asyncsender.Queue, + "full", + mock.Mock(side_effect=custom_full_queue), + ): self.assertEqual(handler.sender.queue_circular, True) self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) exc_counter = 0 try: - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 1, "from": "userA", "to": "userB"}) except QueueOverflowException: exc_counter += 1 try: - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 2, "from": "userA", "to": "userB"}) except QueueOverflowException: exc_counter += 1 try: - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 3, "from": "userA", "to": "userB"}) except QueueOverflowException: exc_counter += 1 # we can't be sure to have exception in every case due to multithreading, # so we can test only for a cautelative condition here - print('Exception raised: {} (expected 3)'.format(exc_counter)) + print(f"Exception raised: {exc_counter} (expected 3)") assert exc_counter >= 0 diff --git a/tests/test_asyncsender.py b/tests/test_asyncsender.py index eb36f96..d690ae1 100644 --- a/tests/test_asyncsender.py +++ b/tests/test_asyncsender.py @@ -1,7 +1,3 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function - import socket import unittest @@ -14,6 +10,7 @@ class TestSetup(unittest.TestCase): def tearDown(self): from fluent.asyncsender import _set_global_sender + _set_global_sender(None) def test_no_kwargs(self): @@ -46,10 +43,11 @@ def test_tolerant(self): class TestSender(unittest.TestCase): def setUp(self): - super(TestSender, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port + ) def tearDown(self): try: @@ -62,41 +60,41 @@ def get_data(self): def test_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_decorator_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_nanosecond(self): with self._sender as sender: sender.nanosecond_precision = True - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) @@ -104,21 +102,21 @@ def test_nanosecond_coerce_float(self): time_ = 1490061367.8616468906402588 with self._sender as sender: sender.nanosecond_precision = True - sender.emit_with_time('foo', time_, {'bar': 'baz'}) + sender.emit_with_time("foo", time_, {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) - eq(data[0][1].data, b'X\xd0\x8873[\xb0*') + eq(data[0][1].data, b"X\xd0\x8873[\xb0*") def test_no_last_error_on_successful_emit(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) self.assertEqual(sender.last_error, None) @@ -135,8 +133,10 @@ def test_clear_last_error(self): self.assertEqual(self._sender.last_error, None) - @unittest.skip("This test failed with 'TypeError: catching classes that do not " - "inherit from BaseException is not allowed' so skipped") + @unittest.skip( + "This test failed with 'TypeError: catching classes that do not " + "inherit from BaseException is not allowed' so skipped" + ) def test_connect_exception_during_sender_init(self, mock_socket): # Make the socket.socket().connect() call raise a custom exception mock_connect = mock_socket.socket.return_value.connect @@ -147,7 +147,9 @@ def test_connect_exception_during_sender_init(self, mock_socket): def test_sender_without_flush(self): with self._sender as sender: - sender._queue.put(fluent.asyncsender._TOMBSTONE) # This closes without closing + sender._queue.put( + fluent.asyncsender._TOMBSTONE + ) # This closes without closing sender._send_thread.join() for x in range(1, 10): sender._queue.put(x) @@ -157,10 +159,11 @@ def test_sender_without_flush(self): class TestSenderDefaultProperties(unittest.TestCase): def setUp(self): - super(TestSenderDefaultProperties, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port + ) def tearDown(self): try: @@ -178,11 +181,11 @@ def test_default_properties(self): class TestSenderWithTimeout(unittest.TestCase): def setUp(self): - super(TestSenderWithTimeout, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_timeout=0.04) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port, queue_timeout=0.04 + ) def tearDown(self): try: @@ -195,27 +198,27 @@ def get_data(self): def test_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_simple_with_timeout_props(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) @@ -224,19 +227,21 @@ class TestEventTime(unittest.TestCase): def test_event_time(self): time = fluent.asyncsender.EventTime(1490061367.8616468906402588) self.assertEqual(time.code, 0) - self.assertEqual(time.data, b'X\xd0\x8873[\xb0*') + self.assertEqual(time.data, b"X\xd0\x8873[\xb0*") class TestSenderWithTimeoutAndCircular(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestSenderWithTimeoutAndCircular, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_maxsize=self.Q_SIZE, - queue_circular=True) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", + port=self._server.port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + ) def tearDown(self): try: @@ -253,15 +258,15 @@ def test_simple(self): self.assertEqual(self._sender.queue_circular, True) self.assertEqual(self._sender.queue_blocking, False) - ok = sender.emit('foo1', {'bar': 'baz1'}) + ok = sender.emit("foo1", {"bar": "baz1"}) self.assertTrue(ok) - ok = sender.emit('foo2', {'bar': 'baz2'}) + ok = sender.emit("foo2", {"bar": "baz2"}) self.assertTrue(ok) - ok = sender.emit('foo3', {'bar': 'baz3'}) + ok = sender.emit("foo3", {"bar": "baz3"}) self.assertTrue(ok) - ok = sender.emit('foo4', {'bar': 'baz4'}) + ok = sender.emit("foo4", {"bar": "baz4"}) self.assertTrue(ok) - ok = sender.emit('foo5', {'bar': 'baz5'}) + ok = sender.emit("foo5", {"bar": "baz5"}) self.assertTrue(ok) data = self.get_data() @@ -282,11 +287,11 @@ class TestSenderWithTimeoutMaxSizeNonCircular(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestSenderWithTimeoutMaxSizeNonCircular, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_maxsize=self.Q_SIZE) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port, queue_maxsize=self.Q_SIZE + ) def tearDown(self): try: @@ -303,15 +308,15 @@ def test_simple(self): self.assertEqual(self._sender.queue_blocking, True) self.assertEqual(self._sender.queue_circular, False) - ok = sender.emit('foo1', {'bar': 'baz1'}) + ok = sender.emit("foo1", {"bar": "baz1"}) self.assertTrue(ok) - ok = sender.emit('foo2', {'bar': 'baz2'}) + ok = sender.emit("foo2", {"bar": "baz2"}) self.assertTrue(ok) - ok = sender.emit('foo3', {'bar': 'baz3'}) + ok = sender.emit("foo3", {"bar": "baz3"}) self.assertTrue(ok) - ok = sender.emit('foo4', {'bar': 'baz4'}) + ok = sender.emit("foo4", {"bar": "baz4"}) self.assertTrue(ok) - ok = sender.emit('foo5', {'bar': 'baz5'}) + ok = sender.emit("foo5", {"bar": "baz5"}) self.assertTrue(ok) data = self.get_data() @@ -319,26 +324,25 @@ def test_simple(self): print(data) eq(5, len(data)) eq(3, len(data[0])) - eq('test.foo1', data[0][0]) - eq({'bar': 'baz1'}, data[0][2]) + eq("test.foo1", data[0][0]) + eq({"bar": "baz1"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) eq(3, len(data[2])) - eq('test.foo3', data[2][0]) - eq({'bar': 'baz3'}, data[2][2]) + eq("test.foo3", data[2][0]) + eq({"bar": "baz3"}, data[2][2]) class TestSenderUnlimitedSize(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestSenderUnlimitedSize, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_timeout=0.04, - queue_maxsize=0) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port, queue_timeout=0.04, queue_maxsize=0 + ) def tearDown(self): try: @@ -357,7 +361,7 @@ def test_simple(self): NUM = 1000 for i in range(1, NUM + 1): - ok = sender.emit("foo{}".format(i), {'bar': "baz{}".format(i)}) + ok = sender.emit(f"foo{i}", {"bar": f"baz{i}"}) self.assertTrue(ok) data = self.get_data() @@ -365,12 +369,12 @@ def test_simple(self): eq(NUM, len(data)) el = data[0] eq(3, len(el)) - eq('test.foo1', el[0]) - eq({'bar': 'baz1'}, el[2]) + eq("test.foo1", el[0]) + eq({"bar": "baz1"}, el[2]) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) el = data[NUM - 1] eq(3, len(el)) - eq("test.foo{}".format(NUM), el[0]) - eq({'bar': "baz{}".format(NUM)}, el[2]) + eq(f"test.foo{NUM}", el[0]) + eq({"bar": f"baz{NUM}"}, el[2]) diff --git a/tests/test_event.py b/tests/test_event.py index d341616..6e2f0a0 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -1,53 +1,46 @@ -# -*- coding: utf-8 -*- - import unittest from fluent import event, sender from tests import mockserver -class TestException(BaseException): pass +class TestException(BaseException): + __test__ = False # teach pytest this is not test class. class TestEvent(unittest.TestCase): def setUp(self): - self._server = mockserver.MockRecvServer('localhost') - sender.setup('app', port=self._server.port) + self._server = mockserver.MockRecvServer("localhost") + sender.setup("app", port=self._server.port) def tearDown(self): from fluent.sender import _set_global_sender + sender.close() _set_global_sender(None) def test_logging(self): # XXX: This tests succeeds even if the fluentd connection failed # send event with tag app.follow - event.Event('follow', { - 'from': 'userA', - 'to': 'userB' - }) + event.Event("follow", {"from": "userA", "to": "userB"}) def test_logging_with_timestamp(self): # XXX: This tests succeeds even if the fluentd connection failed # send event with tag app.follow, with timestamp - event.Event('follow', { - 'from': 'userA', - 'to': 'userB' - }, time=int(0)) + event.Event("follow", {"from": "userA", "to": "userB"}, time=int(0)) def test_no_last_error_on_successful_event(self): global_sender = sender.get_global_sender() - event.Event('unfollow', { - 'from': 'userC', - 'to': 'userD' - }) + event.Event("unfollow", {"from": "userC", "to": "userD"}) self.assertEqual(global_sender.last_error, None) sender.close() - @unittest.skip("This test failed with 'TypeError: catching classes that do not " - "inherit from BaseException is not allowed' so skipped") + @unittest.skip( + "This test failed with 'TypeError: catching classes that do not " + "inherit from BaseException is not allowed' so skipped" + ) def test_connect_exception_during_event_send(self, mock_socket): # Make the socket.socket().connect() call raise a custom exception mock_connect = mock_socket.socket.return_value.connect @@ -58,10 +51,7 @@ def test_connect_exception_during_event_send(self, mock_socket): global_sender = sender.get_global_sender() global_sender._close() - event.Event('unfollow', { - 'from': 'userE', - 'to': 'userF' - }) + event.Event("unfollow", {"from": "userE", "to": "userF"}) ex = global_sender.last_error self.assertEqual(ex.args, EXCEPTION_MSG) diff --git a/tests/test_handler.py b/tests/test_handler.py index 45fea86..711d282 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -1,17 +1,20 @@ -#  -*- coding: utf-8 -*- - import logging -import sys import unittest import fluent.handler from tests import mockserver +def get_logger(name, level=logging.INFO): + logger = logging.getLogger(name) + logger.setLevel(level) + return logger + + class TestHandler(unittest.TestCase): def setUp(self): - super(TestHandler, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -21,18 +24,14 @@ def get_data(self): return self._server.get_received() def test_simple(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({ - 'from': 'userA', - 'to': 'userB' - }) + log.info({"from": "userA", "to": "userB"}) log.removeHandler(handler) @@ -40,216 +39,199 @@ def test_simple(self): eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('app.follow', data[0][0]) - eq('userA', data[0][2]['from']) - eq('userB', data[0][2]['to']) + eq("app.follow", data[0][0]) + eq("userA", data[0][2]["from"]) + eq("userB", data[0][2]["to"]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_custom_fmt(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'lineno': '%(lineno)d', - 'emitted_at': '%(asctime)s', - }) + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "%(name)s", + "lineno": "%(lineno)d", + "emitted_at": "%(asctime)s", + } + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) def test_exclude_attrs(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(exclude_attrs=[]) - ) + log = get_logger("fluent.test") + handler.setFormatter(fluent.handler.FluentRecordFormatter(exclude_attrs=[])) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) def test_exclude_attrs_with_exclusion(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( fluent.handler.FluentRecordFormatter(exclude_attrs=["funcName"]) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) def test_exclude_attrs_with_extra(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(exclude_attrs=[]) - ) + log = get_logger("fluent.test") + handler.setFormatter(fluent.handler.FluentRecordFormatter(exclude_attrs=[])) log.addHandler(handler) log.info("Test with value '%s'", "test value", extra={"x": 1234}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertEqual("Test with value 'test value'", data[0][2]['message']) - self.assertEqual(1234, data[0][2]['x']) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertEqual("Test with value 'test value'", data[0][2]["message"]) + self.assertEqual(1234, data[0][2]["x"]) def test_format_dynamic(self): def formatter(record): - return { - "message": record.message, - "x": record.x, - "custom_value": 1 - } + return {"message": record.message, "x": record.x, "custom_value": 1} formatter.usesTime = lambda: True - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt=formatter) - ) + log = get_logger("fluent.test") + handler.setFormatter(fluent.handler.FluentRecordFormatter(fmt=formatter)) log.addHandler(handler) log.info("Test with value '%s'", "test value", extra={"x": 1234}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('x' in data[0][2]) - self.assertEqual(1234, data[0][2]['x']) - self.assertEqual(1, data[0][2]['custom_value']) + self.assertTrue("x" in data[0][2]) + self.assertEqual(1234, data[0][2]["x"]) + self.assertEqual(1, data[0][2]["custom_value"]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_format_style(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '{name}', - 'lineno': '{lineno}', - 'emitted_at': '{asctime}', - }, style='{') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "{name}", + "lineno": "{lineno}", + "emitted_at": "{asctime}", + }, + style="{", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_template_style(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '${name}', - 'lineno': '${lineno}', - 'emitted_at': '${asctime}', - }, style='$') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "${name}", + "lineno": "${lineno}", + "emitted_at": "${asctime}", + }, + style="$", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) def test_custom_field_raise_exception(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }) + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"} + ) ) log.addHandler(handler) with self.assertRaises(KeyError): - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) def test_custom_field_fill_missing_fmt_key_is_true(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }, - fill_missing_fmt_key=True + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"}, + fill_missing_fmt_key=True, ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('custom_field' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("custom_field" in data[0][2]) # field defaults to none if not in log record - self.assertIsNone(data[0][2]['custom_field']) + self.assertIsNone(data[0][2]["custom_field"]) def test_json_encoded_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) @@ -258,16 +240,17 @@ def test_json_encoded_message(self): log.removeHandler(handler) data = self.get_data() - self.assertTrue('key' in data[0][2]) - self.assertEqual('hello world!', data[0][2]['key']) + self.assertTrue("key" in data[0][2]) + self.assertEqual("hello world!", data[0][2]["key"]) def test_json_encoded_message_without_json(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter(format_json=False)) + log = get_logger("fluent.test") + handler.setFormatter( + fluent.handler.FluentRecordFormatter(format_json=False) + ) log.addHandler(handler) log.info('{"key": "hello world!", "param": "value"}') @@ -275,76 +258,73 @@ def test_json_encoded_message_without_json(self): log.removeHandler(handler) data = self.get_data() - self.assertTrue('key' not in data[0][2]) - self.assertEqual('{"key": "hello world!", "param": "value"}', data[0][2]['message']) + self.assertTrue("key" not in data[0][2]) + self.assertEqual( + '{"key": "hello world!", "param": "value"}', data[0][2]["message"] + ) def test_unstructured_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello %s', 'world') + log.info("hello %s", "world") log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world", data[0][2]["message"]) def test_unstructured_formatted_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello world, %s', 'you!') + log.info("hello world, %s", "you!") log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world, you!', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world, you!", data[0][2]["message"]) def test_number_string_simple_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info("1") log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_simple_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info(42) log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_dict_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({42: 'root'}) + log.info({42: "root"}) log.removeHandler(handler) data = self.get_data() @@ -352,22 +332,21 @@ def test_non_string_dict_message(self): self.assertFalse(42 in data[0][2]) def test_exception_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) try: - raise Exception('sample exception') + raise Exception("sample exception") except Exception: - log.exception('it failed') + log.exception("it failed") log.removeHandler(handler) data = self.get_data() - message = data[0][2]['message'] + message = data[0][2]["message"] # Includes the logged message, as well as the stack trace. - self.assertTrue('it failed' in message) + self.assertTrue("it failed" in message) self.assertTrue('tests/test_handler.py", line' in message) - self.assertTrue('Exception: sample exception' in message) + self.assertTrue("Exception: sample exception" in message) diff --git a/tests/test_sender.py b/tests/test_sender.py index 1c0fbe9..e2c5710 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -1,7 +1,3 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function - import errno import socket import sys @@ -18,6 +14,7 @@ class TestSetup(unittest.TestCase): def tearDown(self): from fluent.sender import _set_global_sender + _set_global_sender(None) def test_no_kwargs(self): @@ -47,10 +44,9 @@ def test_tolerant(self): class TestSender(unittest.TestCase): def setUp(self): - super(TestSender, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.sender.FluentSender(tag='test', - port=self._server.port) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.sender.FluentSender(tag="test", port=self._server.port) def tearDown(self): try: @@ -63,40 +59,40 @@ def get_data(self): def test_simple(self): sender = self._sender - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) sender._close() data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_decorator_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_nanosecond(self): sender = self._sender sender.nanosecond_precision = True - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) sender._close() data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) @@ -104,21 +100,21 @@ def test_nanosecond_coerce_float(self): time = 1490061367.8616468906402588 sender = self._sender sender.nanosecond_precision = True - sender.emit_with_time('foo', time, {'bar': 'baz'}) + sender.emit_with_time("foo", time, {"bar": "baz"}) sender._close() data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) - eq(data[0][1].data, b'X\xd0\x8873[\xb0*') + eq(data[0][1].data, b"X\xd0\x8873[\xb0*") def test_no_last_error_on_successful_emit(self): sender = self._sender - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) sender._close() self.assertEqual(sender.last_error, None) @@ -159,7 +155,7 @@ def test_emit_after_close(self): def test_verbose(self): with self._sender as sender: sender.verbose = True - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) # No assertions here, just making sure there are no exceptions def test_failure_to_connect(self): @@ -222,13 +218,14 @@ def __init__(self): self.to = 123 self.send_side_effects = [3, 0, 9] self.send_idx = 0 - self.recv_side_effects = [socket.error(errno.EWOULDBLOCK, "Blah"), - b"this data is going to be ignored", - b"", - socket.error(errno.EWOULDBLOCK, "Blah"), - socket.error(errno.EWOULDBLOCK, "Blah"), - socket.error(errno.EACCES, "This error will never happen"), - ] + self.recv_side_effects = [ + socket.error(errno.EWOULDBLOCK, "Blah"), + b"this data is going to be ignored", + b"", + socket.error(errno.EWOULDBLOCK, "Blah"), + socket.error(errno.EWOULDBLOCK, "Blah"), + socket.error(errno.EACCES, "This error will never happen"), + ] self.recv_idx = 0 def send(self, bytes_): @@ -296,16 +293,15 @@ def test_unix_socket(self): self.tearDown() tmp_dir = mkdtemp() try: - server_file = 'unix://' + tmp_dir + "/tmp.unix" + server_file = "unix://" + tmp_dir + "/tmp.unix" self._server = mockserver.MockRecvServer(server_file) - self._sender = fluent.sender.FluentSender(tag='test', - host=server_file) + self._sender = fluent.sender.FluentSender(tag="test", host=server_file) with self._sender as sender: - self.assertTrue(sender.emit('foo', {'bar': 'baz'})) + self.assertTrue(sender.emit("foo", {"bar": "baz"})) data = self._server.get_received() self.assertEqual(len(data), 1) - self.assertEqual(data[0][2], {'bar': 'baz'}) + self.assertEqual(data[0][2], {"bar": "baz"}) finally: rmtree(tmp_dir, True) @@ -315,4 +311,4 @@ class TestEventTime(unittest.TestCase): def test_event_time(self): time = fluent.sender.EventTime(1490061367.8616468906402588) self.assertEqual(time.code, 0) - self.assertEqual(time.data, b'X\xd0\x8873[\xb0*') + self.assertEqual(time.data, b"X\xd0\x8873[\xb0*") diff --git a/tox.ini b/tox.ini index 6c3f032..14634e9 100644 --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,8 @@ envlist = py27, py32, py33, py34, py35, py36, py37, py38 skip_missing_interpreters = True [testenv] -deps = nose - coverage~=4.5.4 -commands = python setup.py nosetests +deps = + pytest + pytest-cov + msgpack +commands = pytest --cov=fluent