diff --git a/exporter/clickhouselogsexporter/config.go b/exporter/clickhouselogsexporter/config.go index be63e8bd..fede4911 100644 --- a/exporter/clickhouselogsexporter/config.go +++ b/exporter/clickhouselogsexporter/config.go @@ -35,6 +35,8 @@ type Config struct { // For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn). // For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn). DSN string `mapstructure:"dsn"` + // Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false. + DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"` } // QueueSettings is a subset of exporterhelper.QueueSettings. diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 02e91948..54c23c98 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -290,6 +290,15 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro return nil, fmt.Errorf("failed to create database, err: %s", err) } + // drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes + if cfg.DockerMultiNodeCluster { + err = dropSchemaMigrationsTable(db) + if err != nil { + logger.Error("Error dropping schema_migrations table", zap.Error(err)) + return nil, err + } + } + // do the migration here // get the migrations folder @@ -316,6 +325,15 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro return db, nil } +func dropSchemaMigrationsTable(db clickhouse.Conn) error { + err := db.Exec(context.Background(), fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s ON CLUSTER %s;`, + databaseName, "schema_migrations", CLUSTER)) + if err != nil { + return fmt.Errorf("error dropping schema_migrations table: %v", err) + } + return nil +} + func buildClickhouseMigrateURL(cfg *Config) (string, error) { // return fmt.Sprintf("clickhouse://localhost:9000?database=default&x-multi-statement=true"), nil var clickhouseUrl string diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 1324a89f..f0d194f1 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -40,7 +40,7 @@ func newExporter(cfg component.ExporterConfig, logger *zap.Logger) (*storage, er configClickHouse := cfg.(*Config) - f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasource) + f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasource, configClickHouse.DockerMultiNodeCluster) err := f.Initialize(logger) if err != nil { diff --git a/exporter/clickhousetracesexporter/clickhouse_factory.go b/exporter/clickhousetracesexporter/clickhouse_factory.go index c0c7ea1e..b63cb6d2 100644 --- a/exporter/clickhousetracesexporter/clickhouse_factory.go +++ b/exporter/clickhousetracesexporter/clickhouse_factory.go @@ -47,9 +47,9 @@ type Writer interface { type writerMaker func(logger *zap.Logger, db clickhouse.Conn, traceDatabase string, spansTable string, indexTable string, errorTable string, encoding Encoding, delay time.Duration, size int) (Writer, error) // NewFactory creates a new Factory. -func ClickHouseNewFactory(migrations string, datasource string) *Factory { +func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeCluster bool) *Factory { return &Factory{ - Options: NewOptions(migrations, datasource, primaryNamespace, archiveNamespace), + Options: NewOptions(migrations, datasource, dockerMultiNodeCluster, primaryNamespace, archiveNamespace), // makeReader: func(db *clickhouse.Conn, operationsTable, indexTable, spansTable string) (spanstore.Reader, error) { // return store.NewTraceReader(db, operationsTable, indexTable, spansTable), nil // }, @@ -85,6 +85,14 @@ func (f *Factory) Initialize(logger *zap.Logger) error { return err } + // drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes + if f.Options.primary.DockerMultiNodeCluster { + err = dropSchemaMigrationsTable(db, f) + if err != nil { + return err + } + } + f.logger.Info("Running migrations from path: ", zap.Any("test", f.Options.primary.Migrations)) clickhouseUrl, err := buildClickhouseMigrateURL(f.Options.primary.Datasource, f.Options.primary.Cluster) if err != nil { @@ -185,6 +193,16 @@ func patchGroupByParenInMV(db clickhouse.Conn, f *Factory) error { return nil } +func dropSchemaMigrationsTable(db clickhouse.Conn, f *Factory) error { + err := db.Exec(context.Background(), fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s ON CLUSTER %s;`, + f.Options.getPrimary().TraceDatabase, "schema_migrations", f.Options.getPrimary().Cluster)) + if err != nil { + f.logger.Error("Error dropping schema_migrations table", zap.Error(err)) + return fmt.Errorf("error dropping schema_migrations table: %v", err) + } + return nil +} + func buildClickhouseMigrateURL(datasource string, cluster string) (string, error) { // return fmt.Sprintf("clickhouse://localhost:9000?database=default&x-multi-statement=true"), nil var clickhouseUrl string diff --git a/exporter/clickhousetracesexporter/config.go b/exporter/clickhousetracesexporter/config.go index 27cb45ec..a704ab4d 100644 --- a/exporter/clickhousetracesexporter/config.go +++ b/exporter/clickhousetracesexporter/config.go @@ -26,6 +26,8 @@ type Config struct { Options `mapstructure:",squash"` Datasource string `mapstructure:"datasource"` Migrations string `mapstructure:"migrations"` + // Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false. + DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"` } var _ component.ExporterConfig = (*Config)(nil) diff --git a/exporter/clickhousetracesexporter/options.go b/exporter/clickhousetracesexporter/options.go index ad1a550b..692842d0 100644 --- a/exporter/clickhousetracesexporter/options.go +++ b/exporter/clickhousetracesexporter/options.go @@ -79,6 +79,7 @@ type namespaceConfig struct { DependencyGraphDbMV string DependencyGraphMessagingMV string DependencyGraphTable string + DockerMultiNodeCluster bool WriteBatchDelay time.Duration WriteBatchSize int Encoding Encoding @@ -125,7 +126,7 @@ type Options struct { } // NewOptions creates a new Options struct. -func NewOptions(migrations string, datasource string, primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions(migrations string, datasource string, dockerMultiNodeCluster bool, primaryNamespace string, otherNamespaces ...string) *Options { if datasource == "" { datasource = defaultDatasource @@ -153,6 +154,7 @@ func NewOptions(migrations string, datasource string, primaryNamespace string, o DependencyGraphServiceMV: defaultDependencyGraphServiceMV, DependencyGraphDbMV: defaultDependencyGraphDbMV, DependencyGraphMessagingMV: DependencyGraphMessagingMV, + DockerMultiNodeCluster: dockerMultiNodeCluster, WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding,