Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions internal/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"context"
"errors"
"fmt"
"github.com/cherts/pgscv/internal/cache"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"net"
"strconv"
"strings"
"time"

"github.com/cherts/pgscv/internal/cache"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/cherts/pgscv/internal/log"
"github.com/cherts/pgscv/internal/model"
"github.com/cherts/pgscv/internal/store"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Config struct {
ConcurrencyLimit *int
CacheConfig *cache.Config
DB *store.DB
LogDirectory string
}

// postgresServiceConfig defines Postgres-specific stuff required during collecting Postgres metrics.
Expand All @@ -59,7 +61,7 @@ type postgresServiceConfig struct {
// loggingCollector defines value of 'logging_collector' GUC.
loggingCollector bool
// logDestination defines value of 'log_destination' GUC.
logDestination string
logDestination postgresLogsDestination
// pgStatStatements defines is pg_stat_statements available in shared_preload_libraries and available for queries.
pgStatStatements bool
// pgStatStatementsSchema defines the schema name where pg_stat_statements is installed.
Expand Down Expand Up @@ -209,7 +211,7 @@ func newPostgresServiceConfig(connStr string, connTimeout int) (postgresServiceC
return config, fmt.Errorf("failed to get log_destination setting from pg_settings, %s, please check user grants", err)
}

config.logDestination = setting
config.logDestination = postgresLogsDestination(setting)

// Discover pg_stat_statements.
exists, schema, err := discoverPgStatStatements(conn)
Expand Down
218 changes: 170 additions & 48 deletions internal/collector/postgres_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package collector

import (
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"regexp"
"strings"
"sync"
Expand All @@ -15,6 +17,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

type postgresLogsDestination string

const (
postgresLogsDestinationJSON postgresLogsDestination = "jsonlog"
postgresLogsDestinationStderr postgresLogsDestination = "stderr"
)

// Current implementation has an issue described here: https://github.com/nxadm/tail/issues/18.
// When attempting to tail previously tailed logfiles, new messages are not coming from the Lines channel.
// At the same time, test Test_runTailLoop works as intended and doesn't show the problem.
Expand All @@ -27,11 +36,12 @@ type syncKV struct {
type postgresLogsCollector struct {
updateLogfile chan string // updateLogfile used for notify tail/collect goroutine when logfile has been changed.
currentLogfile string // currentLogfile contains logfile name currently tailed and used for collecting stat.
totals syncKV // totals contains collected stats about total number of log messages.
panics syncKV // panics contains all collected messages with PANIC severity.
fatals syncKV // fatals contains all collected messages with FATAL severity.
errors syncKV // errors contains all collected messages with ERROR severity.
warnings syncKV // warnings contains all collected messages with WARNING severity.
logDestination postgresLogsDestination
totals syncKV // totals contains collected stats about total number of log messages.
panics syncKV // panics contains all collected messages with PANIC severity.
fatals syncKV // fatals contains all collected messages with FATAL severity.
errors syncKV // errors contains all collected messages with ERROR severity.
warnings syncKV // warnings contains all collected messages with WARNING severity.
messagesTotal typedDesc
panicMessages typedDesc
fatalMessages typedDesc
Expand Down Expand Up @@ -109,8 +119,7 @@ func NewPostgresLogsCollector(constLabels labels, settings model.CollectorSettin
// Update method generates metrics based on collected log messages.
func (c *postgresLogsCollector) Update(_ context.Context, config Config, ch chan<- prometheus.Metric) error {
if !config.localService {
log.Debugln("[postgres log collector]: skip collecting metrics from remote services")
return nil
log.Warnln("[postgres log collector]: collecting metrics from remote services, proper mounting is necessary")
}

if !config.loggingCollector {
Expand All @@ -122,11 +131,15 @@ func (c *postgresLogsCollector) Update(_ context.Context, config Config, ch chan
return nil
}

if config.logDestination != "stderr" {
log.Debugln("[postgres log collector]: PostgreSQL parameter log_destination not set to stderr, log collector disabled")
switch config.logDestination {
case postgresLogsDestinationStderr, postgresLogsDestinationJSON:
default:
log.Debugln("[postgres log collector]: PostgreSQL parameter log_destination not set to stderr or jsonlog, log collector disabled")
return nil
}

c.logDestination = config.logDestination

// Notify log collector goroutine if logfile has been changed.
logfile, err := queryCurrentLogfile(config)
if err != nil {
Expand Down Expand Up @@ -221,7 +234,7 @@ func tailCollect(ctx context.Context, logfile string, init bool, wg *sync.WaitGr
tailConfig.Location = &tail.SeekInfo{Whence: io.SeekEnd}
}

parser := newLogParser()
parser := newLogParser(c.logDestination)
log.Infof("starting tail of %s from the %s", logfile, offset)
t, err := tail.TailFile(logfile, tailConfig)
if err != nil {
Expand Down Expand Up @@ -261,22 +274,42 @@ func queryCurrentLogfile(config Config) (string, error) {
return "", err
}

if config.LogDirectory != "" {
datadir = config.LogDirectory
}

if !strings.HasPrefix(logfile, "/") {
logfile = datadir + "/" + logfile
logfile = filepath.Join(datadir, logfile)
}

return logfile, nil
}

// logParser contains set or regexp patterns used for parse log messages.
type logParser struct {
type logParser interface {
updateMessagesStats(line string, c *postgresLogsCollector)
}

func newLogParser(format postgresLogsDestination) logParser {
if format == postgresLogsDestinationStderr {
return newStderrLogParser()
}

if format == postgresLogsDestinationJSON {
return newJSONLogParser()
}

return nil
}

// stderrLogParser contains set or regexp patterns used for parse log messages.
type stderrLogParser struct {
reSeverity map[string]*regexp.Regexp // regexp to determine messages severity.
reExtract *regexp.Regexp // regexp for extracting exact messages from the whole line (drop log_line_prefix stuff).
reNormalize []*regexp.Regexp // regexp for normalizing log message.
}

// newLogParser creates a new logParser with necessary compiled regexp objects.
func newLogParser() *logParser {
// newStderrLogParser creates a new logParser with necessary compiled regexp objects.
func newStderrLogParser() *stderrLogParser {
severityPatterns := map[string]string{
"log": `\s?LOG:\s+`,
"warning": `\s?WARNING:\s+`,
Expand All @@ -290,7 +323,7 @@ func newLogParser() *logParser {
`(\s+".+?"\s?)`,
}

p := &logParser{
p := &stderrLogParser{
reSeverity: map[string]*regexp.Regexp{},
reNormalize: make([]*regexp.Regexp, len(normalizePatterns)),
}
Expand All @@ -309,45 +342,17 @@ func newLogParser() *logParser {
}

// updateMessagesStats process the message string, parse and update stats.
func (p *logParser) updateMessagesStats(line string, c *postgresLogsCollector) {
func (p *stderrLogParser) updateMessagesStats(line string, c *postgresLogsCollector) {
m, found := p.parseMessageSeverity(line)
if !found {
return
}

// Update totals.
c.totals.mu.Lock()
c.totals.store[m]++
c.totals.mu.Unlock()

if m == "log" {
return
}

// Message with severity higher than LOG, normalize them and update.
normalized := p.normalizeMessage(line)
switch m {
case "panic":
c.panics.mu.Lock()
c.panics.store[normalized]++
c.panics.mu.Unlock()
case "fatal":
c.fatals.mu.Lock()
c.fatals.store[normalized]++
c.fatals.mu.Unlock()
case "error":
c.errors.mu.Lock()
c.errors.store[normalized]++
c.errors.mu.Unlock()
case "warning":
c.warnings.mu.Lock()
c.warnings.store[normalized]++
c.warnings.mu.Unlock()
}
updateMessagesStats(m, p.normalizeMessage(line), c)
}

// parseMessageSeverity accepts lines and parse it using patterns from logParser.
func (p *logParser) parseMessageSeverity(line string) (string, bool) {
func (p *stderrLogParser) parseMessageSeverity(line string) (string, bool) {
if line == "" {
return "", false
}
Expand All @@ -363,7 +368,7 @@ func (p *logParser) parseMessageSeverity(line string) (string, bool) {
}

// normalizeMessage used for normalizing log messages and removing unique elements like names or ids.
func (p *logParser) normalizeMessage(message string) string {
func (p *stderrLogParser) normalizeMessage(message string) string {
parts := p.reExtract.FindStringSubmatch(message)
if len(parts) < 2 {
return ""
Expand All @@ -377,3 +382,120 @@ func (p *logParser) normalizeMessage(message string) string {

return message
}

// jsonLogParser contains set or regexp patterns used for parse log messages.
type jsonLogParser struct {
reNormalize []*regexp.Regexp // regexp for normalizing log message.
}

// newLogParser creates a new logParser with necessary compiled regexp objects.
func newJSONLogParser() *jsonLogParser {
normalizePatterns := []string{
`(\s+\d+\s?)`,
`(\s+".+?"\s?)`,
}

p := &jsonLogParser{
reNormalize: make([]*regexp.Regexp, len(normalizePatterns)),
}

for i, pattern := range normalizePatterns {
p.reNormalize[i] = regexp.MustCompile(pattern)
}

return p
}

type jsonLine struct {
ErrorSeverity string `json:"error_severity"`
Message string `json:"message"`
}

// updateMessagesStats process the message string, parse and update stats.
func (p *jsonLogParser) updateMessagesStats(line string, c *postgresLogsCollector) {
jsonLine, err := p.jsonLine(line)
if err != nil {
log.Errorln(err)
return
}

sev, found := p.parseMessageSeverity(jsonLine)
if !found {
return
}

updateMessagesStats(sev, p.normalizeMessage(jsonLine), c)
}

func (p *jsonLogParser) jsonLine(line string) (jsonLine, error) {
var parsedLine jsonLine
err := json.Unmarshal([]byte(line), &parsedLine)
if err != nil {
return jsonLine{}, fmt.Errorf("can't unmarshal %s as jsonLine: %w", line, err)
}

return parsedLine, nil
}

func (p *jsonLogParser) parseMessageSeverity(line jsonLine) (string, bool) {
switch line.ErrorSeverity {
case "PANIC":
return "panic", true
case "FATAL":
return "fatal", true
case "ERROR":
return "error", true
case "WARNING":
return "warning", true
case "LOG":
return "log", true
default:
return "", false
}
}

// normalizeMessage used for normalizing log messages and removing unique elements like names or ids.
func (p *jsonLogParser) normalizeMessage(jsonLine jsonLine) string {
switch jsonLine.ErrorSeverity {
case "PANIC", "FATAL", "ERROR", "WARNING":
default:
return ""
}

message := jsonLine.Message
for _, re := range p.reNormalize {
message = strings.TrimSpace(re.ReplaceAllString(message, " ? "))
}

return message
}

func updateMessagesStats(sev, normalized string, c *postgresLogsCollector) {
// Update totals.
c.totals.mu.Lock()
c.totals.store[sev]++
c.totals.mu.Unlock()

if sev == "log" {
return
}

switch sev {
case "panic":
c.panics.mu.Lock()
c.panics.store[normalized]++
c.panics.mu.Unlock()
case "fatal":
c.fatals.mu.Lock()
c.fatals.store[normalized]++
c.fatals.mu.Unlock()
case "error":
c.errors.mu.Lock()
c.errors.store[normalized]++
c.errors.mu.Unlock()
case "warning":
c.warnings.mu.Lock()
c.warnings.store[normalized]++
c.warnings.mu.Unlock()
}
}
Loading
Loading