Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Ruff for format and linting #201

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ on:
pull_request:

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Ruff
run: pipx install ruff
- name: Ruff check
run: ruff check
- name: Ruff format
run: ruff format --diff

test:
runs-on: ubuntu-latest
strategy:
Expand Down
2 changes: 1 addition & 1 deletion fluent/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.10.1dev1'
__version__ = "0.10.1dev1"
8 changes: 3 additions & 5 deletions fluent/asynchandler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# -*- coding: utf-8 -*-

from fluent import asyncsender
from fluent import handler


class FluentHandler(handler.FluentHandler):
'''
"""
Asynchronous Logging Handler for fluent.
'''
"""

def getSenderClass(self):
return asyncsender.FluentSender
Expand All @@ -18,7 +16,7 @@ def close(self):
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
super().close()
finally:
self.release()

Expand Down
63 changes: 36 additions & 27 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-

import threading
from queue import Queue, Full, Empty

Expand All @@ -17,8 +15,7 @@


def _set_global_sender(sender): # pragma: no cover
""" [For testing] Function to set global sender directly
"""
"""[For testing] Function to set global sender directly"""
global _global_sender
_global_sender = sender

Expand All @@ -37,41 +34,53 @@ def close(): # pragma: no cover


class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
queue_overflow_handler=None,
**kwargs):
def __init__(
self,
tag,
host="localhost",
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
queue_overflow_handler=None,
**kwargs,
):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
"""
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision,
msgpack_kwargs=msgpack_kwargs,
**kwargs)
super().__init__(
tag=tag,
host=host,
port=port,
bufmax=bufmax,
timeout=timeout,
verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision,
msgpack_kwargs=msgpack_kwargs,
**kwargs,
)
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
if queue_circular and queue_overflow_handler:
self._queue_overflow_handler = queue_overflow_handler
else:
self._queue_overflow_handler = self._queue_overflow_handler_default

self._thread_guard = threading.Event() # This ensures visibility across all variables
self._thread_guard = (
threading.Event()
) # This ensures visibility across all variables
self._closed = False

self._queue = Queue(maxsize=queue_maxsize)
self._send_thread = threading.Thread(target=self._send_loop,
name="AsyncFluentSender %d" % id(self))
self._send_thread = threading.Thread(
target=self._send_loop, name="AsyncFluentSender %d" % id(self)
)
self._send_thread.daemon = True
self._send_thread.start()

Expand Down Expand Up @@ -121,7 +130,7 @@ def _send(self, bytes_):
return True

def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal
send_internal = super()._send_internal

try:
while True:
Expand Down
10 changes: 4 additions & 6 deletions fluent/event.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# -*- coding: utf-8 -*-

import time

from fluent import sender


class Event(object):
class Event:
def __init__(self, label, data, **kwargs):
assert isinstance(data, dict), 'data must be a dict'
sender_ = kwargs.get('sender', sender.get_global_sender())
timestamp = kwargs.get('time', int(time.time()))
assert isinstance(data, dict), "data must be a dict"
sender_ = kwargs.get("sender", sender.get_global_sender())
timestamp = kwargs.get("time", int(time.time()))
sender_.emit_with_time(label, timestamp, data)
143 changes: 83 additions & 60 deletions fluent/handler.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
# -*- coding: utf-8 -*-

import json
import logging
import socket
import sys

try:
import simplejson as json
except ImportError: # pragma: no cover
import json

from fluent import sender


class FluentRecordFormatter(logging.Formatter, object):
""" A structured formatter for Fluent.
class FluentRecordFormatter(logging.Formatter):
"""A structured formatter for Fluent.

Best used with server storing data in an ElasticSearch cluster for example.

Expand All @@ -33,36 +26,49 @@ class FluentRecordFormatter(logging.Formatter, object):
Can be an iterable.
"""

def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False, format_json=True,
exclude_attrs=None):
super(FluentRecordFormatter, self).__init__(None, datefmt)

if sys.version_info[0:2] >= (3, 2) and style != '%':
def __init__(
self,
fmt=None,
datefmt=None,
style="%",
fill_missing_fmt_key=False,
format_json=True,
exclude_attrs=None,
):
super().__init__(None, datefmt)

if style != "%":
self.__style, basic_fmt_dict = {
'{': (logging.StrFormatStyle, {
'sys_host': '{hostname}',
'sys_name': '{name}',
'sys_module': '{module}',
}),
'$': (logging.StringTemplateStyle, {
'sys_host': '${hostname}',
'sys_name': '${name}',
'sys_module': '${module}',
}),
"{": (
logging.StrFormatStyle,
{
"sys_host": "{hostname}",
"sys_name": "{name}",
"sys_module": "{module}",
},
),
"$": (
logging.StringTemplateStyle,
{
"sys_host": "${hostname}",
"sys_name": "${name}",
"sys_module": "${module}",
},
),
}[style]
else:
self.__style = None
basic_fmt_dict = {
'sys_host': '%(hostname)s',
'sys_name': '%(name)s',
'sys_module': '%(module)s',
"sys_host": "%(hostname)s",
"sys_name": "%(name)s",
"sys_module": "%(module)s",
}

if exclude_attrs is not None:
self._exc_attrs = set(exclude_attrs)
self._fmt_dict = None
self._formatter = self._format_by_exclusion
self.usesTime = super(FluentRecordFormatter, self).usesTime
self.usesTime = super().usesTime
else:
self._exc_attrs = None
if not fmt:
Expand All @@ -89,7 +95,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False

def format(self, record):
# Compute attributes handled by parent class.
super(FluentRecordFormatter, self).format(record)
super().format(record)
# Add ours
record.hostname = self.hostname

Expand All @@ -103,7 +109,7 @@ def usesTime(self):
"""This method is substituted on construction based on settings for performance reasons"""

def _structuring(self, data, record):
""" Melds `msg` into `data`.
"""Melds `msg` into `data`.

:param data: dictionary to be sent to fluent server
:param msg: :class:`LogRecord`'s message to add to `data`.
Expand All @@ -118,7 +124,7 @@ def _structuring(self, data, record):
elif isinstance(msg, str):
self._add_dic(data, self._format_msg(record, msg))
else:
self._add_dic(data, {'message': msg})
self._add_dic(data, {"message": msg})

def _format_msg_json(self, record, msg):
try:
Expand All @@ -131,7 +137,7 @@ def _format_msg_json(self, record, msg):
return self._format_msg_default(record, msg)

def _format_msg_default(self, record, msg):
return {'message': super(FluentRecordFormatter, self).format(record)}
return {"message": super().format(record)}

def _format_by_exclusion(self, record):
data = {}
Expand Down Expand Up @@ -175,17 +181,18 @@ class FluentHandler(logging.Handler):
Logging Handler for fluent.
"""

def __init__(self,
tag,
host='localhost',
port=24224,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
msgpack_kwargs=None,
nanosecond_precision=False,
**kwargs):

def __init__(
self,
tag,
host="localhost",
port=24224,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
msgpack_kwargs=None,
nanosecond_precision=False,
**kwargs,
):
self.tag = tag
self._host = host
self._port = port
Expand Down Expand Up @@ -213,37 +220,53 @@ def sender(self):
buffer_overflow_handler=self._buffer_overflow_handler,
msgpack_kwargs=self._msgpack_kwargs,
nanosecond_precision=self._nanosecond_precision,
**self._kwargs
**self._kwargs,
)
return self._sender

def getSenderInstance(self, tag, host, port, timeout, verbose,
buffer_overflow_handler, msgpack_kwargs,
nanosecond_precision, **kwargs):
def getSenderInstance(
self,
tag,
host,
port,
timeout,
verbose,
buffer_overflow_handler,
msgpack_kwargs,
nanosecond_precision,
**kwargs,
):
sender_class = self.getSenderClass()
return sender_class(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision, **kwargs)
return sender_class(
tag,
host=host,
port=port,
timeout=timeout,
verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision,
**kwargs,
)

def emit(self, record):
data = self.format(record)
_sender = self.sender
return _sender.emit_with_time(None,
sender.EventTime(record.created)
if _sender.nanosecond_precision
else int(record.created),
data)
return _sender.emit_with_time(
None,
sender.EventTime(record.created)
if _sender.nanosecond_precision
else int(record.created),
data,
)

def close(self):
self.acquire()
try:
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
super().close()
finally:
self.release()

Expand Down
Loading