Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion fluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ def __init__(self,
host='localhost',
port=24224,
timeout=3.0,
packager='msgpack',
verbose=False,
buffer_overflow_handler=None):

self.tag = tag
self.sender = sender.FluentSender(tag,
host=host, port=port,
packager=packager,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler)
logging.Handler.__init__(self)
Expand All @@ -132,7 +134,7 @@ def emit(self, record):
def close(self):
self.acquire()
try:
self.sender._close()
self.sender.close()
logging.Handler.close(self)
finally:
self.release()
68 changes: 29 additions & 39 deletions fluent/sender.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import socket
import threading
import time
import traceback

import json
import msgpack

from fluent.transport import Transport, TransportError


_global_sender = None

Expand All @@ -27,14 +29,17 @@ def setup(tag, **kwargs):
def get_global_sender():
return _global_sender


def close():
get_global_sender().close()


class FluentSender(object):
def __init__(self,
tag,
host='localhost',
port=24224,
packager="msgpack",
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
Expand All @@ -48,17 +53,18 @@ def __init__(self,
self.timeout = timeout
self.verbose = verbose
self.buffer_overflow_handler = buffer_overflow_handler
self.packager = self.get_packager(packager)

self.socket = None
self.pendings = None
self.lock = threading.Lock()
self._last_error_threadlocal = threading.local()

self.transport = Transport(self.host, self.port, self.timeout)
try:
self._reconnect()
except socket.error:
self.transport.connect()
except TransportError:
# will be retried in emit()
self._close()
self.transport.close()

def emit(self, label, data):
cur_time = int(time.time())
Expand All @@ -80,16 +86,15 @@ def close(self):
try:
if self.pendings:
try:
self._send_data(self.pendings)
self.transport.send(self.pendings)
except Exception:
self._call_buffer_overflow_handler(self.pendings)

self._close()
self.transport.close()
self.pendings = None
finally:
self.lock.release()


def _make_packet(self, label, timestamp, data):
if label:
tag = '.'.join((self.tag, label))
Expand All @@ -98,7 +103,16 @@ def _make_packet(self, label, timestamp, data):
packet = (tag, timestamp, data)
if self.verbose:
print(packet)
return msgpack.packb(packet)
return self.packager(packet)

def get_packager(self, name):
if name == 'json':
return json.dumps

if name == 'msgpack':
return msgpack.packb

raise RuntimeError("Unknown packager: {}", name)

def _send(self, bytes_):
self.lock.acquire()
Expand All @@ -114,18 +128,17 @@ def _send_internal(self, bytes_):
bytes_ = self.pendings

try:
self._send_data(bytes_)
self.transport.send(bytes_)

# send finished
self.pendings = None

return True
except socket.error as e:
#except Exception as e:
except TransportError as e:
self.last_error = e

# close socket
self._close()
# close transport
self.transport.close()

# clear buffer if it exceeds max bufer size
if self.pendings and (len(self.pendings) > self.bufmax):
Expand All @@ -136,24 +149,6 @@ def _send_internal(self, bytes_):

return False

def _send_data(self, bytes_):
# reconnect if possible
self._reconnect()
# send message
self.socket.sendall(bytes_)

def _reconnect(self):
if not self.socket:
if self.host.startswith('unix://'):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect(self.host[len('unix://'):])
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
self.socket = sock

def _call_buffer_overflow_handler(self, pending_events):
try:
if self.buffer_overflow_handler:
Expand All @@ -168,13 +163,8 @@ def last_error(self):

@last_error.setter
def last_error(self, err):
self._last_error_threadlocal.exception = err
self._last_error_threadlocal.exception = err

def clear_last_error(self, _thread_id = None):
def clear_last_error(self, _thread_id=None):
if hasattr(self._last_error_threadlocal, 'exception'):
delattr(self._last_error_threadlocal, 'exception')

def _close(self):
if self.socket:
self.socket.close()
self.socket = None
69 changes: 69 additions & 0 deletions fluent/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# encoding=utf-8

import socket

try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse


class Transport(object):
def __init__(self, host, port, timeout):
self.host = host
self.port = port
self.timeout = timeout

self._conn = None

def close(self):
if self._conn:
self._conn.close()
self._conn = None

def connect(self):
if self._conn:
return

family, socket_type, addr = get_connection_params(self.host, self.port)
self._conn = socket.socket(family, socket_type)
self._conn.connect(addr)
self._conn.settimeout(self.timeout)

def send(self, data):
self.connect()
self._conn.sendall(data)


def get_connection_params(url, port=0):
parsed = urlparse(url)

port = parsed.port or port or 0

scheme = parsed.scheme.lower()
if scheme == 'unix':
family = socket.AF_UNIX
socket_type = socket.SOCK_STREAM
addr = parsed.hostname

elif scheme == 'udp':
family = socket.AF_INET
socket_type = socket.SOCK_DGRAM
addr = (parsed.hostname, port)

elif scheme in ('tcp', ''):
family = socket.AF_INET
socket_type = socket.SOCK_STREAM
addr = (parsed.hostname or parsed.path, port)

else:
raise TransportError(
"Unknown connection protocol: url={}, port={}".format(
url, port,
)
)

return family, socket_type, addr


TransportError = socket.error
Loading