Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This patch supports sending logs over UDP #81

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion fluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ def __init__(self,
host='localhost',
port=24224,
timeout=3.0,
packager='msgpack',
verbose=False,
buffer_overflow_handler=None):

self.tag = tag
self.sender = sender.FluentSender(tag,
host=host, port=port,
packager=packager,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler)
logging.Handler.__init__(self)
Expand All @@ -132,7 +134,7 @@ def emit(self, record):
def close(self):
self.acquire()
try:
self.sender._close()
self.sender.close()
logging.Handler.close(self)
finally:
self.release()
68 changes: 29 additions & 39 deletions fluent/sender.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import socket
import threading
import time
import traceback

import json
import msgpack

from fluent.transport import Transport, TransportError


_global_sender = None

Expand All @@ -27,14 +29,17 @@ def setup(tag, **kwargs):
def get_global_sender():
return _global_sender


def close():
get_global_sender().close()


class FluentSender(object):
def __init__(self,
tag,
host='localhost',
port=24224,
packager="msgpack",
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
Expand All @@ -48,17 +53,18 @@ def __init__(self,
self.timeout = timeout
self.verbose = verbose
self.buffer_overflow_handler = buffer_overflow_handler
self.packager = self.get_packager(packager)

self.socket = None
self.pendings = None
self.lock = threading.Lock()
self._last_error_threadlocal = threading.local()

self.transport = Transport(self.host, self.port, self.timeout)
try:
self._reconnect()
except socket.error:
self.transport.connect()
except TransportError:
# will be retried in emit()
self._close()
self.transport.close()

def emit(self, label, data):
cur_time = int(time.time())
Expand All @@ -80,16 +86,15 @@ def close(self):
try:
if self.pendings:
try:
self._send_data(self.pendings)
self.transport.send(self.pendings)
except Exception:
self._call_buffer_overflow_handler(self.pendings)

self._close()
self.transport.close()
self.pendings = None
finally:
self.lock.release()


def _make_packet(self, label, timestamp, data):
if label:
tag = '.'.join((self.tag, label))
Expand All @@ -98,7 +103,16 @@ def _make_packet(self, label, timestamp, data):
packet = (tag, timestamp, data)
if self.verbose:
print(packet)
return msgpack.packb(packet)
return self.packager(packet)

def get_packager(self, name):
if name == 'json':
return json.dumps

if name == 'msgpack':
return msgpack.packb

raise RuntimeError("Unknown packager: {}", name)

def _send(self, bytes_):
self.lock.acquire()
Expand All @@ -114,18 +128,17 @@ def _send_internal(self, bytes_):
bytes_ = self.pendings

try:
self._send_data(bytes_)
self.transport.send(bytes_)

# send finished
self.pendings = None

return True
except socket.error as e:
#except Exception as e:
except TransportError as e:
self.last_error = e

# close socket
self._close()
# close transport
self.transport.close()

# clear buffer if it exceeds max bufer size
if self.pendings and (len(self.pendings) > self.bufmax):
Expand All @@ -136,24 +149,6 @@ def _send_internal(self, bytes_):

return False

def _send_data(self, bytes_):
# reconnect if possible
self._reconnect()
# send message
self.socket.sendall(bytes_)

def _reconnect(self):
if not self.socket:
if self.host.startswith('unix://'):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect(self.host[len('unix://'):])
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
self.socket = sock

def _call_buffer_overflow_handler(self, pending_events):
try:
if self.buffer_overflow_handler:
Expand All @@ -168,13 +163,8 @@ def last_error(self):

@last_error.setter
def last_error(self, err):
self._last_error_threadlocal.exception = err
self._last_error_threadlocal.exception = err

def clear_last_error(self, _thread_id = None):
def clear_last_error(self, _thread_id=None):
if hasattr(self._last_error_threadlocal, 'exception'):
delattr(self._last_error_threadlocal, 'exception')

def _close(self):
if self.socket:
self.socket.close()
self.socket = None
69 changes: 69 additions & 0 deletions fluent/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# encoding=utf-8

import socket

try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse


class Transport(object):
def __init__(self, host, port, timeout):
self.host = host
self.port = port
self.timeout = timeout

self._conn = None

def close(self):
if self._conn:
self._conn.close()
self._conn = None

def connect(self):
if self._conn:
return

family, socket_type, addr = get_connection_params(self.host, self.port)
self._conn = socket.socket(family, socket_type)
self._conn.connect(addr)
self._conn.settimeout(self.timeout)

def send(self, data):
self.connect()
self._conn.sendall(data.encode('utf-8'))


def get_connection_params(url, port=0):
parsed = urlparse(url)

port = parsed.port or port or 0

scheme = parsed.scheme.lower()
if scheme == 'unix':
family = socket.AF_UNIX
socket_type = socket.SOCK_STREAM
addr = parsed.hostname

elif scheme == 'udp':
family = socket.AF_INET
socket_type = socket.SOCK_DGRAM
addr = (parsed.hostname, port)

elif scheme in ('tcp', ''):
family = socket.AF_INET
socket_type = socket.SOCK_STREAM
addr = (parsed.hostname or parsed.path, port)

else:
raise TransportError(
"Unknown connection protocol: url={}, port={}".format(
url, port,
)
)

return family, socket_type, addr


TransportError = socket.error
Loading