diff --git a/.github/workflows/requirements.txt b/.github/workflows/requirements.txt index 9de7e7f64..99d768a11 100644 --- a/.github/workflows/requirements.txt +++ b/.github/workflows/requirements.txt @@ -17,3 +17,5 @@ brotli xvfbwrapper google-cloud-storage google-cloud-pubsub +pytz +tzlocal \ No newline at end of file diff --git a/internal/rfc5424logging/__init__.py b/internal/rfc5424logging/__init__.py new file mode 100644 index 000000000..ebaf9f21f --- /dev/null +++ b/internal/rfc5424logging/__init__.py @@ -0,0 +1,10 @@ +""" +This library offers an logging in rfc5424 format. + +Is based on https://github.com/jobec/rfc5424-logging-handler that is not currently maintained. + +A few changes needed to be implemented anyway to tailor the logging to wptagent, hence the hard copy. +""" + +from .handler import Rfc5424SysLogHandler +from .rfc5424logging_context import logging_context \ No newline at end of file diff --git a/internal/rfc5424logging/handler.py b/internal/rfc5424logging/handler.py new file mode 100644 index 000000000..9d93ba46b --- /dev/null +++ b/internal/rfc5424logging/handler.py @@ -0,0 +1,499 @@ +# coding=utf-8 +import socket +import sys +from codecs import BOM_UTF8 +from collections import OrderedDict +from datetime import datetime +from logging import Handler + +from pytz import utc + +from . import transport + +NILVALUE = '-' + +SP = b' ' +# As defined in RFC5424 Section 7 +REGISTERED_SD_IDs = ('timeQuality', 'origin', 'meta') +SYSLOG_VERSION = '1' + +EMERGENCY = 70 +EMERG = EMERGENCY +ALERT = 60 +NOTICE = 25 + +PY2 = sys.version_info[0] == 2 + + +# facility codes +LOG_KERN = 0 # kernel messages +LOG_USER = 1 # random user-level messages +LOG_MAIL = 2 # mail system +LOG_DAEMON = 3 # system daemons +LOG_AUTH = 4 # security/authorization messages +LOG_SYSLOG = 5 # messages generated internally by syslogd +LOG_LPR = 6 # line printer subsystem +LOG_NEWS = 7 # network news subsystem +LOG_UUCP = 8 # UUCP subsystem +LOG_CRON = 9 # clock daemon +LOG_AUTHPRIV = 10 # security/authorization messages (private) +LOG_FTP = 11 # FTP daemon +# other codes through 15 reserved for system use +LOG_LOCAL0 = 16 # reserved for local use +LOG_LOCAL1 = 17 # reserved for local use +LOG_LOCAL2 = 18 # reserved for local use +LOG_LOCAL3 = 19 # reserved for local use +LOG_LOCAL4 = 20 # reserved for local use +LOG_LOCAL5 = 21 # reserved for local use +LOG_LOCAL6 = 22 # reserved for local use +LOG_LOCAL7 = 23 # reserved for local use + + +class Rfc5424SysLogHandler(Handler): + """ + A handler class which sends RFC 5424 formatted logging records to a syslog server. + + Based on the python built-in SyslogHandler class, but simplified in some parts. + """ + # from : + # ====================================================================== + # priorities/facilities are encoded into a single 32-bit quantity, where + # the bottom 3 bits are the priority (0-7) and the top 28 bits are the + # facility (0-big number). Both the priorities and the facilities map + # roughly one-to-one to strings in the syslogd(8) source code. This + # mapping is included in this file. + # + # priorities (these are ordered) + LOG_EMERG = 0 # system is unusable + LOG_ALERT = 1 # action must be taken immediately + LOG_CRIT = 2 # critical conditions + LOG_ERR = 3 # error conditions + LOG_WARNING = 4 # warning conditions + LOG_NOTICE = 5 # normal but significant condition + LOG_INFO = 6 # informational + LOG_DEBUG = 7 # debug-level messages + + priority_map = { + "DEBUG": LOG_DEBUG, + "INFO": LOG_INFO, + "NOTICE": LOG_NOTICE, + "WARNING": LOG_WARNING, + "ERROR": LOG_ERR, + "CRITICAL": LOG_CRIT, + "ALERT": LOG_ALERT, + "EMERGENCY": LOG_EMERG, + "EMERG": LOG_EMERG, + } + + def __init__( + self, + address=('localhost', transport.SYSLOG_PORT), + facility=LOG_USER, + socktype=socket.SOCK_DGRAM, + framing=transport.FRAMING_NON_TRANSPARENT, + msg_as_utf8=True, + hostname=None, + appname=None, + procid=None, + structured_data=None, + enterprise_id=None, + utc_timestamp=False, + timeout=5, + tls_enable=False, + tls_ca_bundle=None, + tls_verify=True, + tls_client_cert=None, + tls_client_key=None, + tls_key_password=None, + stream=None, + file_name=None + ): + """ + Returns a new instance of the Rfc5424SysLogHandler class intended to communicate with + a remote machine whose address is given by address in the form of a (host, port) tuple. + If address is not specified, ``('localhost', 514)`` is used. The address is used to open a + socket. + + An alternative to providing a (host, port) tuple is providing an address as a + string, for example `/dev/log`. In this case, a Unix domain socket is used to send the + message to the syslog. If facility is not specified, LOG_USER is used. The type of + socket opened depends on the socktype argument, which defaults to socket.SOCK_DGRAM + and thus opens a UDP socket. To open a TCP socket (for use with the newer syslog + daemons such as rsyslog), specify a value of socket.SOCK_STREAM. + + Note that if your server is not listening on UDP port 514, SysLogHandler may appear + not to work. In that case, check what address you should be using for a domain socket + - it's system dependent. For example, on Linux it's usually ``/dev/log`` but on OS/X + it's ``/var/run/syslog``. You'll need to check your platform and use the appropriate + address (you may need to do this check at runtime if your application needs to run + on several platforms). On Windows, you pretty much have to use the UDP option. + + As an alternative transport, you can also provide a stream + + Args: + address (tuple|list): + address in the form of a (host, port) tuple or list + + facility (int): + One of the ``rfc5424logging.LOG_*`` values. + + socktype (int): + One of ``socket.SOCK_STREAM`` (TCP) or ``socket.SOCK_DGRAM`` (UDP). + + framing (int, optional): + One of the ``rfc5424logging.FRAMING_*`` values according to + RFC6587 section 3.4. Only applies when sockettype is ``socket.SOCK_STREAM`` (TCP) + and is used to give the syslog server an indication about the boundaries + of the message. Defaults to ``FRAMING_NON_TRANSPARENT`` which will escape all + newline characters in the message and end the message with a newline character. + When set to ``FRAMING_OCTET_COUNTING``, it will prepend the message length to the + begin of the message. + msg_as_utf8 (bool): + Controls the way the message is sent. + disabling this parameter sends the message as MSG-ANY (RFC2424 section 6), avoiding + issues with receivers that don't support the UTF-8 Byte Order Mark (BOM) at + the beginning of the message. + hostname (str): + The hostname of the system where the message originated from. + Defaults to the values returned by ``socket.gethostname()`` + appname (str): + The name of the application. Defaults to the name of the logger that sent + the message. + procid (any): + The process ID of the sending application. Defaults to the ``process`` attribute + of the log record. + structured_data (dict): + A dictionary with structured data that is added to every message. Per message your + can add more structured data by adding it to the ``extra`` argument of the log function. + enterprise_id (str): + The Private Enterprise Number. This is used to compose the structured data IDs when + they do not include an Enterprise ID and are not one of the reserved structured data IDs. + Can be a single PEN like ``32473`` or optionally contain sub-identifiers like ``32473.2.6`` + utc_timestamp (bool): + Whether the timestamp should be converted to UTC time or kept in the local timezone + timeout (int): + Sets the timeout on the connection to the server. + tls_enable (bool): + If set to ``True``, it sets up a TLS/SSL connection to the address specified in ``address`` + over which the syslog messages will be sent. + Default to ``False`` + tls_ca_bundle (str): + The path to a bundle of CA certificates used for validating the remote server's identity. + If set to ``None``, it will try to load the default CA as described in + https://docs.python.org/3/library/ssl.html#ssl.SSLContext.load_verify_locations + tls_verify (bool): + Whether to verify the certificate of the server. + tls_client_cert (str): + path to a file containing a client certificate. + tls_client_key (str): + Path to a file containing the client private key. + tls_key_password (str): + Optionally the password for decrypting the specified private key. + stream (io.BufferedIOBase, file, io.TextIOBase): + Optionally a stream object to send the message to. See https://docs.python.org/3/library/io.html + for details. + file_name: + Optionally a file where to log in utf8 format. The file is assumed to be rotated by an external service + so eventually is reopened if the file_name point to a different file on each write. + Useful on UNIX systems. + """ + super(Rfc5424SysLogHandler, self).__init__() + + self.address = address + self.facility = facility + self.socktype = socktype + self.socket = None + self.unixsocket = False + self.hostname = hostname + self.appname = appname + self.procid = procid + self.structured_data = structured_data + self.enterprise_id = enterprise_id + self.framing = framing + self.msg_as_utf8 = msg_as_utf8 + self.utc_timestamp = utc_timestamp + self.timeout = timeout + self.tls_enable = tls_enable + self.tls_ca_bundle = tls_ca_bundle + self.tls_verify = tls_verify + self.tls_client_cert = tls_client_cert + self.tls_client_key = tls_client_key + self.tls_key_password = tls_key_password + self.stream = stream + self.file_name = file_name + self.transport = None + + from tzlocal import get_localzone + self._local_zone = get_localzone() + + if not (isinstance(self.facility, int) and LOG_KERN <= self.facility <= LOG_LOCAL7): + raise ValueError("Facility is not valid") + + if self.hostname is None or self.hostname == '': + self.hostname = socket.gethostname() + if not isinstance(self.structured_data, dict): + self.structured_data = OrderedDict() + + self._setup_transport() + + def _setup_transport(self): + if self.file_name is not None: + self.transport = transport.RotatedFileTransport(self.file_name) + elif self.stream is not None: + self.transport = transport.StreamTransport(self.stream) + elif isinstance(self.address, str): + self.transport = transport.UnixSocketTransport(self.address, self.socktype) + elif isinstance(self.address, (tuple, list)): + if self.socktype == socket.SOCK_STREAM: + if self.tls_enable: + self.transport = transport.TLSSocketTransport( + self.address, self.timeout, self.framing, + self.tls_ca_bundle, self.tls_verify, + self.tls_client_cert, self.tls_client_key, self.tls_key_password + ) + else: + self.transport = transport.TCPSocketTransport(self.address, self.timeout, self.framing) + else: + self.transport = transport.UDPSocketTransport(self.address, self.timeout) + else: + raise ValueError("Unsupported address type") + + def encode_priority(self, facility, priority): + """ + Encode the facility and priority. You can pass in strings or + integers - if strings are passed, the facility_names and + priority_names mapping dictionaries are used to convert them to + integers. + """ + return (facility << 3) | self.priority_map.get(priority, self.LOG_WARNING) + + @staticmethod + def filter_printusascii(str_to_filter): + return ''.join([x for x in str_to_filter if 33 <= ord(x) <= 126]) + + def get_hostname(self, record): + hostname = getattr(record, 'hostname', None) + if not hostname: + hostname = self.hostname + if hostname is None or hostname == '': + hostname = NILVALUE + return self.filter_printusascii(str(hostname)) + + def get_appname(self, record): + appname = getattr(record, 'appname', self.appname) + if appname is None or appname == '': + appname = getattr(record, 'name', NILVALUE) + return self.filter_printusascii(str(appname)) + + def get_procid(self, record): + procid = getattr(record, 'procid', self.procid) + if procid is None or procid == '': + procid = getattr(record, 'process', NILVALUE) + return self.filter_printusascii(str(procid)) + + def get_msgid(self, record): + msgid = getattr(record, 'msgid', NILVALUE) + if msgid is None or msgid == '': + msgid = NILVALUE + return self.filter_printusascii(str(msgid)) + + def get_enterprise_id(self, record): + # We allow None to be returned here. + # We'll handle it when cleaning the structured data + enterprise_id = getattr(record, 'enterprise_id', self.enterprise_id) + if enterprise_id is None: + return None + else: + return self.filter_printusascii(str(enterprise_id)) + + def get_structured_data(self, record): + structured_data = OrderedDict() + if isinstance(self.structured_data, dict): + structured_data.update(self.structured_data) + record_sd = getattr(record, 'structured_data', {}) + if isinstance(record_sd, dict): + structured_data.update(record_sd) + return structured_data + + def build_msg(self, record): + # The syslog message has the following ABNF [RFC5234] definition: + # + # SYSLOG-MSG = HEADER SP STRUCTURED-DATA [SP MSG] + # + # HEADER = PRI VERSION SP TIMESTAMP SP HOSTNAME + # SP APP-NAME SP PROCID SP MSGID + # PRI = "<" PRIVAL ">" + # PRIVAL = 1*3DIGIT ; range 0 .. 191 + # VERSION = NONZERO-DIGIT 0*2DIGIT + # HOSTNAME = NILVALUE / 1*255PRINTUSASCII + # + # APP-NAME = NILVALUE / 1*48PRINTUSASCII + # PROCID = NILVALUE / 1*128PRINTUSASCII + # MSGID = NILVALUE / 1*32PRINTUSASCII + # + # TIMESTAMP = NILVALUE / FULL-DATE "T" FULL-TIME + # FULL-DATE = DATE-FULLYEAR "-" DATE-MONTH "-" DATE-MDAY + # DATE-FULLYEAR = 4DIGIT + # DATE-MONTH = 2DIGIT ; 01-12 + # DATE-MDAY = 2DIGIT ; 01-28, 01-29, 01-30, 01-31 based on + # ; month/year + # FULL-TIME = PARTIAL-TIME TIME-OFFSET + # PARTIAL-TIME = TIME-HOUR ":" TIME-MINUTE ":" TIME-SECOND + # [TIME-SECFRAC] + # TIME-HOUR = 2DIGIT ; 00-23 + # TIME-MINUTE = 2DIGIT ; 00-59 + # TIME-SECOND = 2DIGIT ; 00-59 + # TIME-SECFRAC = "." 1*6DIGIT + # TIME-OFFSET = "Z" / TIME-NUMOFFSET + # TIME-NUMOFFSET = ("+" / "-") TIME-HOUR ":" TIME-MINUTE + # + # + # STRUCTURED-DATA = NILVALUE / 1*SD-ELEMENT + # SD-ELEMENT = "[" SD-ID *(SP SD-PARAM) "]" + # SD-PARAM = PARAM-NAME "=" %d34 PARAM-VALUE %d34 + # SD-ID = SD-NAME + # PARAM-NAME = SD-NAME + # PARAM-VALUE = UTF-8-STRING ; characters '"', '\' and + # ; ']' MUST be escaped. + # SD-NAME = 1*32PRINTUSASCII + # ; except '=', SP, ']', %d34 (") + # + # MSG = MSG-ANY / MSG-UTF8 + # MSG-ANY = *OCTET ; not starting with BOM + # MSG-UTF8 = BOM UTF-8-STRING + # BOM = %xEF.BB.BF + # + # UTF - 8 - STRING = *OCTET ; UTF - 8 string as specified + # ; in RFC 3629 + # + # OCTET = % d00 - 255 + # SP = % d32 + # PRINTUSASCII = % d33 - 126 + # NONZERO - DIGIT = % d49 - 57 + # DIGIT = % d48 / NONZERO - DIGIT + # NILVALUE = "-" + + # HEADER + pri = '<%d>' % self.encode_priority(self.facility, record.levelname) + version = SYSLOG_VERSION + timestamp = datetime.fromtimestamp(record.created, self._local_zone) + if self.utc_timestamp: + timestamp = timestamp.astimezone(utc) + timestamp = timestamp.isoformat() + hostname = self.get_hostname(record) + appname = self.get_appname(record) + procid = self.get_procid(record) + msgid = self.get_msgid(record) + + pri = pri.encode('ascii') + version = version.encode('ascii') + timestamp = timestamp.encode('ascii') + hostname = hostname.encode('ascii', 'replace')[:255] + appname = appname.encode('ascii', 'replace')[:48] + procid = procid.encode('ascii', 'replace')[:128] + msgid = msgid.encode('ascii', 'replace')[:32] + + header = b''.join((pri, version, SP, timestamp, SP, hostname, SP, appname, SP, procid, SP, msgid)) + + # STRUCTURED-DATA + enterprise_id = self.get_enterprise_id(record) + structured_data = self.get_structured_data(record) + cleaned_structured_data = [] + for sd_id, sd_params in list(structured_data.items()): + # Clean structured data ID + sd_id = self.filter_printusascii(sd_id) + sd_id = sd_id.replace('=', '').replace(' ', '').replace(']', '').replace('"', '') + if '@' not in sd_id and sd_id not in REGISTERED_SD_IDs and enterprise_id is None: + raise ValueError("Enterprise ID has not been set. Cannot build structured data ID. " + "Please set a enterprise ID when initializing the logging handler " + "or include one in the structured data ID.") + elif '@' in sd_id: + sd_id, enterprise_id = sd_id.rsplit('@', 1) + + if len(enterprise_id) > 30: + raise ValueError("Enterprise ID is too long. Impossible to build structured data ID.") + + sd_id = sd_id.replace('@', '') + if len(sd_id) + len(enterprise_id) > 32: + sd_id = sd_id[:31 - len(enterprise_id)] + if sd_id not in REGISTERED_SD_IDs: + sd_id = '@'.join((sd_id, enterprise_id)) + sd_id = sd_id.encode('ascii', 'replace') + + cleaned_sd_params = [] + # ignore sd params not int key-value format + if isinstance(sd_params, dict): + sd_params = sd_params.items() + else: + sd_params = [] + + # Clean key-value pairs + for (param_name, param_value) in sd_params: + param_name = self.filter_printusascii(str(param_name)) + param_name = param_name.replace('=', '').replace(' ', '').replace(']', '').replace('"', '') + if param_value is None: + param_value = '' + + param_value = str(param_value) + + if PY2: + param_value = unicode(param_value, 'utf-8') # noqa + + param_value = param_value.replace('\\', '\\\\').replace('"', '\\"').replace(']', '\\]') + + param_name = param_name.encode('ascii', 'replace')[:32] + param_value = param_value.encode('utf-8', 'replace') + + sd_param = b''.join((param_name, b'="', param_value, b'"')) + cleaned_sd_params.append(sd_param) + + cleaned_sd_params = SP.join(cleaned_sd_params) + + # build structured data element + spacer = SP if cleaned_sd_params else b'' + sd_element = b''.join((b'[', sd_id, spacer, cleaned_sd_params, b']')) + cleaned_structured_data.append(sd_element) + + if cleaned_structured_data: + structured_data = b''.join(cleaned_structured_data) + else: + structured_data = NILVALUE.encode('ascii') + + # MSG + if record.msg is None or record.msg == "": + pieces = (header, structured_data) + else: + msg = self.format(record) + if self.msg_as_utf8: + msg = b''.join((BOM_UTF8, msg.encode('utf-8'))) + else: + msg = msg.encode('utf-8') + pieces = (header, structured_data, msg) + syslog_msg = SP.join(pieces) + + return syslog_msg + + def emit(self, record): + """ + Emit a record. + + The record is formatted, and then sent to the syslog server. If + exception information is present, it is NOT sent to the server. + """ + try: + syslog_msg = self.build_msg(record) + self.transport.transmit(syslog_msg) + except Exception: + self.handleError(record) + + def close(self): + """ + Closes the socket. + """ + self.acquire() + try: + if self.transport is not None: + self.transport.close() + super(Rfc5424SysLogHandler, self).close() + finally: + self.release() diff --git a/internal/rfc5424logging/rfc5424logging_context.py b/internal/rfc5424logging/rfc5424logging_context.py new file mode 100644 index 000000000..a2f723907 --- /dev/null +++ b/internal/rfc5424logging/rfc5424logging_context.py @@ -0,0 +1,34 @@ +from enum import Enum + +class MessageId(Enum): + """Represent all messages ids, utilized as phases in wptagent, that can be used in the code.""" + INIT="initialization" + +class LoggingContext(): + """Collection additional information to be passed as extra argument to each logging call.""" + + def __init__(self): + self.message_id = MessageId.INIT + self.structured_data = dict() + pass + + def set_message_id(self, id): + """Set the message id, effectively changing phase of the agent.""" + self.message_id = id + + def as_extra(self, one_time_data=None): + """Get dictionary format to pass to logging extra parameter.""" + structured_data = self.structured_data + if one_time_data != None and isinstance(one_time_data, dict): + structured_data = dict() + structured_data.update(self.structured_data) + structured_data.update(one_time_data) + + # see Rfc5424SysLogHandler.get_structured_data for details + return {'msgid': self.message_id.value, 'structured_data': structured_data} + +context = LoggingContext() + +def logging_context(): + """Get the global context.""" + return context diff --git a/internal/rfc5424logging/transport.py b/internal/rfc5424logging/transport.py new file mode 100644 index 000000000..8a3908bff --- /dev/null +++ b/internal/rfc5424logging/transport.py @@ -0,0 +1,248 @@ +import io +import socket +import ssl +import sys +import os +from stat import ST_DEV, ST_INO + +if sys.version_info.major == 3: + text_stream_types = io.TextIOBase + bytes_stream_types = io.BufferedIOBase +else: + text_stream_types = io.TextIOBase + bytes_stream_types = io.BufferedIOBase, file # noqa: F821 + +SYSLOG_PORT = 514 + +# RFC6587 framing +FRAMING_OCTET_COUNTING = 1 +FRAMING_NON_TRANSPARENT = 2 + + +class TCPSocketTransport: + def __init__(self, address, timeout, framing): + self.socket = None + self.address = address + self.timeout = timeout + self.framing = framing + self.open() + + def open(self): + error = None + host, port = self.address + addrinfo = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM) + if not addrinfo: + raise OSError("getaddrinfo returns an empty list") + for entry in addrinfo: + family, socktype, _, _, sockaddr = entry + try: + self.socket = socket.socket(family, socktype) + self.socket.settimeout(self.timeout) + self.socket.connect(sockaddr) + # Connected successfully. Erase any previous errors. + error = None + break + except OSError as e: + error = e + if self.socket is not None: + self.socket.close() + if error is not None: + raise error + + def transmit(self, syslog_msg): + # RFC6587 framing + if self.framing == FRAMING_NON_TRANSPARENT: + syslog_msg = syslog_msg.replace(b"\n", b"\\n") + syslog_msg = b"".join((syslog_msg, b"\n")) + else: + syslog_msg = b" ".join((str(len(syslog_msg)).encode("ascii"), syslog_msg)) + + try: + self.socket.sendall(syslog_msg) + except (OSError, IOError): + self.close() + self.open() + self.socket.sendall(syslog_msg) + + def close(self): + self.socket.close() + + +class TLSSocketTransport(TCPSocketTransport): + def __init__( + self, + address, + timeout, + framing, + tls_ca_bundle, + tls_verify, + tls_client_cert, + tls_client_key, + tls_key_password, + ): + self.tls_ca_bundle = tls_ca_bundle + self.tls_verify = tls_verify + self.tls_client_cert = tls_client_cert + self.tls_client_key = tls_client_key + self.tls_key_password = tls_key_password + super(TLSSocketTransport, self).__init__(address, timeout, framing=framing) + + def open(self): + super(TLSSocketTransport, self).open() + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tls_ca_bundle + ) + context.verify_mode = ssl.CERT_REQUIRED if self.tls_verify else ssl.CERT_NONE + server_hostname, _ = self.address + if self.tls_client_cert: + context.load_cert_chain( + self.tls_client_cert, self.tls_client_key, self.tls_key_password + ) + self.socket = context.wrap_socket(self.socket, server_hostname=server_hostname) + + +class UDPSocketTransport: + def __init__(self, address, timeout): + self.socket = None + self.address = address + self.timeout = timeout + self.open() + + def open(self): + error = None + host, port = self.address + addrinfo = socket.getaddrinfo(host, port, 0, socket.SOCK_DGRAM) + if not addrinfo: + raise OSError("getaddrinfo returns an empty list") + for entry in addrinfo: + family, socktype, _, _, sockaddr = entry + try: + self.socket = socket.socket(family, socktype) + self.socket.settimeout(self.timeout) + self.address = sockaddr + break + except OSError as e: + error = e + if self.socket is not None: + self.socket.close() + if error is not None: + raise error + + def transmit(self, syslog_msg): + try: + self.socket.sendto(syslog_msg, self.address) + except (OSError, IOError): + self.close() + self.open() + self.socket.sendto(syslog_msg, self.address) + + def close(self): + self.socket.close() + + +class UnixSocketTransport: + def __init__(self, address, socket_type): + self.socket = None + self.address = address + self.socket_type = socket_type + self.open() + + def open(self): + if self.socket_type is None: + socket_types = [socket.SOCK_DGRAM, socket.SOCK_STREAM] + else: + socket_types = [self.socket_type] + + for type_ in socket_types: + # Syslog server may be unavailable during handler initialization + # So we ignore connection errors + try: + self.socket = socket.socket(socket.AF_UNIX, type_) + self.socket.connect(self.address) + self.socket_type = type_ + break + except OSError: + if self.socket is not None: + self.socket.close() + + def transmit(self, syslog_msg): + try: + self.socket.send(syslog_msg) + except (OSError, IOError): + self.close() + self.open() + self.socket.send(syslog_msg) + + def close(self): + self.socket.close() + + +class StreamTransport: + def __init__(self, stream): + if isinstance(stream, text_stream_types): + self.text_mode = True + elif isinstance(stream, bytes_stream_types): + self.text_mode = False + else: + raise ValueError("Stream is not of a valid stream type") + + if not stream.writable(): + raise ValueError("Stream is not a writeable stream") + + self.stream = stream + + def transmit(self, syslog_msg): + syslog_msg = syslog_msg + b"\n" + if self.text_mode: + syslog_msg = syslog_msg.decode(self.stream.encoding, "replace") + self.stream.write(syslog_msg) + # flushing on each write to have a better "rolling logs" (e.g. tail -f) experience + self.stream.flush() + + def close(self): + # Closing the stream is left up to the user. + pass + + +class RotatedFileTransport(StreamTransport): + """Transport that allow an external process to rotate the file used as transport""" + + def __init__(self, file_name): + self.file_name = file_name + self.identifier = None + StreamTransport.__init__(self, self._start()) + + def _start(self): + """ + Start opening the file and storing its stats as ID. + """ + stream = open(self.file_name, "a+", encoding="utf-8") + + # using fileno instead of _makeId in case in being rotated just now + file_stats = os.fstat(stream.fileno()) + self.identifier = (file_stats[ST_DEV], file_stats[ST_INO]) + return stream + + def _makeId(self): + try: + file_stats = os.stat(self.file_name) + return (file_stats[ST_DEV], file_stats[ST_INO]) + except FileNotFoundError: + return None + + def _reopenIfRotated(self): + """ + If the stats of the file changed, reopens it. + """ + if self.identifier != self._makeId(): + if self.stream is not None: + self.close() + self.stream = self._start() + + def transmit(self, syslog_msg): + self._reopenIfRotated() + StreamTransport.transmit(self, syslog_msg) + + def close(self): + self.stream.flush() + self.stream.close() \ No newline at end of file diff --git a/wptagent.py b/wptagent.py index 78c5b5010..a7a1681a8 100644 --- a/wptagent.py +++ b/wptagent.py @@ -8,6 +8,7 @@ import atexit import logging import logging.handlers +from internal.rfc5424logging import Rfc5424SysLogHandler, logging_context import os import platform import gzip @@ -1058,6 +1059,59 @@ def get_browser_versions(browsers): browsers[browser]['version'] = get_file_version(exe) +# Constant used for --logformat command line parameter mapping +LOG_FORMATS = ["syslog"] + +def setup_logging(verbosity=0, log_format=None, log_file=None): + """Setup logging according to passed command line values""" + + # Set log level and legacy format + basic_log_level = logging.CRITICAL + if verbosity is None: + pass # default critical + elif verbosity == 1: + basic_log_level = logging.ERROR + elif verbosity == 2: + basic_log_level = logging.WARNING + elif verbosity == 3: + basic_log_level = logging.INFO + elif verbosity >= 4: + basic_log_level = logging.DEBUG + + if log_format is None: + # legacy behavior + logging.basicConfig(level=basic_log_level, format="%(asctime)s.%(msecs)03d - %(message)s", + datefmt="%H:%M:%S") + + # If file is specified add self rotating file with just errors level or above. + if log_file is not None: + err_log = logging.handlers.RotatingFileHandler(log_file, maxBytes=1000000, + backupCount=5, delay=True) + err_log.setLevel(logging.ERROR) + logging.getLogger().addHandler(err_log) + else: + if log_format == "syslog" and log_file is not None: + logger = logging.getLogger() + logger.setLevel(basic_log_level) + handler = Rfc5424SysLogHandler( \ + address=None, \ + socktype=None, \ + framing=None, \ + msg_as_utf8=True, \ + appname="wptagent", \ + # Currently used just for versioning + enterprise_id="1", \ + utc_timestamp=True, \ + file_name=log_file \ + ) + + logger.addHandler(handler) + logger.debug("Rfc5424SysLogHandler initialized", extra=logging_context().as_extra({"msg_as_utf8": True})) + else: + # used default loggger + logging.critical("log_file must be specified if log_format is used.") + exit(1) + def main(): """Startup and initialization""" import argparse @@ -1069,7 +1123,7 @@ def main(): parser.add_argument('--name', help="Agent name (for the work directory).") parser.add_argument('--exit', type=int, default=0, help='Exit after the specified number of minutes.\n' - ' Useful for running in a shell script that does some maintenence\n' + ' Useful for running in a shell script that does some maintenance\n' ' or updates periodically (like hourly).') parser.add_argument('--dockerized', action='store_true', default=False, help="Agent is running in a docker container.") @@ -1080,7 +1134,15 @@ def main(): parser.add_argument('--alive', help="Watchdog file to update when successfully connected.") parser.add_argument('--log', - help="Log critical errors to the given file.") + help="Output the logs to the given file." + "For backward compatibility if --logformat is not specified only critical errors will be logged " + "to the file with the legacy non standardized format") + parser.add_argument('--logformat', + help="Change the log level format when using '--log' option." + "When this option is specified will only print to the file specified with '--log'." + "When this parameter is used verbosity will also take effect.", + type=str.lower, + choices=LOG_FORMATS) parser.add_argument('--noidle', action='store_true', default=False, help="Do not wait for system idle at any point.") parser.add_argument('--collectversion', action='store_true', default=False, @@ -1213,23 +1275,7 @@ def main(): exit(1) # Set up logging - log_level = logging.CRITICAL - if options.verbose == 1: - log_level = logging.ERROR - elif options.verbose == 2: - log_level = logging.WARNING - elif options.verbose == 3: - log_level = logging.INFO - elif options.verbose >= 4: - log_level = logging.DEBUG - logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", - datefmt="%H:%M:%S") - - if options.log: - err_log = logging.handlers.RotatingFileHandler(options.log, maxBytes=1000000, - backupCount=5, delay=True) - err_log.setLevel(logging.ERROR) - logging.getLogger().addHandler(err_log) + setup_logging(options.verbose, options.logformat, options.log) if options.ec2 or options.gce: upgrade_pip_modules()