diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index c0340450271..38e54951e75 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -487,6 +487,11 @@ func (s *Server) Open() error { // so it only logs as is appropriate. s.QueryExecutor.TaskManager.Logger = s.Logger } + if s.config.Data.QueryLogPath != "" { + path := s.config.Data.QueryLogPath + s.QueryExecutor.WithLogWriter(s.Logger, path) + } + s.PointsWriter.WithLogger(s.Logger) s.Subscriber.WithLogger(s.Logger) for _, svc := range s.Services { diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e66d9290c3d..bd09c34f274 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -66,6 +66,7 @@ # Whether queries should be logged before execution. Very useful for troubleshooting, but will # log any sensitive data contained within a query. # query-log-enabled = true + # query-log-path = "/var/log/influxdb/query.log" # It is possible to collect statistics of points written per-measurement and/or per-login. # These can be accessed via the monitoring subsystem. diff --git a/go.mod b/go.mod index 29b01bde70f..afd5365f7ce 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/davecgh/go-spew v1.1.1 github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 + github.com/fsnotify/fsnotify v1.4.7 github.com/go-chi/chi v4.1.0+incompatible github.com/golang-jwt/jwt v3.2.1+incompatible github.com/golang/mock v1.5.0 diff --git a/go.sum b/go.sum index 9bb9846a7e5..c3bf60eac3c 100644 --- a/go.sum +++ b/go.sum @@ -311,6 +311,7 @@ github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro= github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8= diff --git a/query/executor.go b/query/executor.go index 0afd6922b03..810acc17770 100644 --- a/query/executor.go +++ b/query/executor.go @@ -11,9 +11,11 @@ import ( "sync/atomic" "time" + "github.com/fsnotify/fsnotify" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxql" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) var ( @@ -289,6 +291,84 @@ func (e *Executor) WithLogger(log *zap.Logger) { e.TaskManager.Logger = e.Logger } +func initQueryLogWriter(log *zap.Logger, e *Executor, path string) (*os.File, error) { + logFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + e.Logger.Error("failed to open log file", zap.Error(err)) + return nil, err + } + + existingCore := log.Core() + + encoderConfig := zap.NewProductionEncoderConfig() + + fileCore := zapcore.NewCore( + zapcore.NewJSONEncoder(encoderConfig), + zapcore.Lock(logFile), + zapcore.InfoLevel, + ) + + newCore := zapcore.NewTee(existingCore, fileCore) + e.Logger = zap.New(newCore) + e.TaskManager.Logger = e.Logger + + return logFile, nil +} + +func (e *Executor) WithLogWriter(log *zap.Logger, path string) { + var file *os.File + var err error + + file, err = initQueryLogWriter(log, e, path) + if err != nil { + e.Logger.Error("failed to open log file", zap.Error(err)) + return + } + + go func() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + e.Logger.Error("failed to create log file watcher", zap.Error(err)) + return + } + + err = watcher.Add(path) + if err != nil { + e.Logger.Error("failed to watch log file", zap.Error(err)) + return + } + defer watcher.Close() + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + e.Logger.Error("failed to watch log file", zap.String("event", event.Name)) + return + } + e.Logger.Debug("event", zap.String("event", event.Name)) + if event.Op == fsnotify.Remove || event.Op == fsnotify.Rename { + if err := file.Sync(); err != nil { + e.Logger.Error("failed to sync log file", zap.Error(err)) + return + } + if err := file.Close(); err != nil { + e.Logger.Error("failed to close log file", zap.Error(err)) + return + } + + e.Logger.Debug("creating a new query log file; registered file was altered.", zap.String("event", event.Name), zap.String("path", path)) + file, err = initQueryLogWriter(log, e, path) + if err != nil { + e.Logger.Error("failed to create log file watcher", zap.Error(err)) + return + } + } + } + } + }() +} + // ExecuteQuery executes each statement within a query. func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { results := make(chan *Result) diff --git a/query/executor_test.go b/query/executor_test.go index 9237c7e53d9..44e79f660b0 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -3,6 +3,8 @@ package query_test import ( "errors" "fmt" + "github.com/stretchr/testify/require" + "os" "strings" "testing" "time" @@ -595,6 +597,34 @@ func TestQueryExecutor_InvalidSource(t *testing.T) { } } +func TestQueryExecutor_WriteQueryToLog(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + require.NoError(t, err, "parse query") + + f, err := os.CreateTemp("", "query-test.log") + require.NoError(t, err, "create temp file") + + defer os.Remove(f.Name()) + + e := NewQueryExecutor() + e.WithLogWriter(e.Logger, f.Name()) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(1), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + err = f.Close() + require.NoError(t, err, "close temp file") + + dat, err := os.ReadFile(f.Name()) + cont := strings.Contains(string(dat), "SELECT count(value) FROM cpu") + require.True(t, cont, "expected query output") +} + func discardOutput(results <-chan *query.Result) { for range results { // Read all results and discard. diff --git a/tsdb/config.go b/tsdb/config.go index feb4927783a..2588b13fdc3 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -105,6 +105,9 @@ type Config struct { // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` + // Query logging directed to a log file + QueryLogPath string `toml:"query-log-path"` + // Compaction options for tsm1 (descriptions above with defaults) CacheMaxMemorySize toml.Size `toml:"cache-max-memory-size"` CacheSnapshotMemorySize toml.Size `toml:"cache-snapshot-memory-size"`