Skip to content

Commit

Permalink
fix: add docker multi node cluster flag (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
makeavish authored Dec 9, 2022
1 parent c79f5ce commit 5cbc85e
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 4 deletions.
2 changes: 2 additions & 0 deletions exporter/clickhouselogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
DSN string `mapstructure:"dsn"`
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"`
}

// QueueSettings is a subset of exporterhelper.QueueSettings.
Expand Down
18 changes: 18 additions & 0 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,15 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro
return nil, fmt.Errorf("failed to create database, err: %s", err)
}

// drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes
if cfg.DockerMultiNodeCluster {
err = dropSchemaMigrationsTable(db)
if err != nil {
logger.Error("Error dropping schema_migrations table", zap.Error(err))
return nil, err
}
}

// do the migration here

// get the migrations folder
Expand All @@ -316,6 +325,15 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro
return db, nil
}

func dropSchemaMigrationsTable(db clickhouse.Conn) error {
err := db.Exec(context.Background(), fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s ON CLUSTER %s;`,
databaseName, "schema_migrations", CLUSTER))
if err != nil {
return fmt.Errorf("error dropping schema_migrations table: %v", err)
}
return nil
}

func buildClickhouseMigrateURL(cfg *Config) (string, error) {
// return fmt.Sprintf("clickhouse://localhost:9000?database=default&x-multi-statement=true"), nil
var clickhouseUrl string
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newExporter(cfg component.ExporterConfig, logger *zap.Logger) (*storage, er

configClickHouse := cfg.(*Config)

f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasource)
f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasource, configClickHouse.DockerMultiNodeCluster)

err := f.Initialize(logger)
if err != nil {
Expand Down
22 changes: 20 additions & 2 deletions exporter/clickhousetracesexporter/clickhouse_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ type Writer interface {
type writerMaker func(logger *zap.Logger, db clickhouse.Conn, traceDatabase string, spansTable string, indexTable string, errorTable string, encoding Encoding, delay time.Duration, size int) (Writer, error)

// NewFactory creates a new Factory.
func ClickHouseNewFactory(migrations string, datasource string) *Factory {
func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeCluster bool) *Factory {
return &Factory{
Options: NewOptions(migrations, datasource, primaryNamespace, archiveNamespace),
Options: NewOptions(migrations, datasource, dockerMultiNodeCluster, primaryNamespace, archiveNamespace),
// makeReader: func(db *clickhouse.Conn, operationsTable, indexTable, spansTable string) (spanstore.Reader, error) {
// return store.NewTraceReader(db, operationsTable, indexTable, spansTable), nil
// },
Expand Down Expand Up @@ -85,6 +85,14 @@ func (f *Factory) Initialize(logger *zap.Logger) error {
return err
}

// drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes
if f.Options.primary.DockerMultiNodeCluster {
err = dropSchemaMigrationsTable(db, f)
if err != nil {
return err
}
}

f.logger.Info("Running migrations from path: ", zap.Any("test", f.Options.primary.Migrations))
clickhouseUrl, err := buildClickhouseMigrateURL(f.Options.primary.Datasource, f.Options.primary.Cluster)
if err != nil {
Expand Down Expand Up @@ -185,6 +193,16 @@ func patchGroupByParenInMV(db clickhouse.Conn, f *Factory) error {
return nil
}

func dropSchemaMigrationsTable(db clickhouse.Conn, f *Factory) error {
err := db.Exec(context.Background(), fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s ON CLUSTER %s;`,
f.Options.getPrimary().TraceDatabase, "schema_migrations", f.Options.getPrimary().Cluster))
if err != nil {
f.logger.Error("Error dropping schema_migrations table", zap.Error(err))
return fmt.Errorf("error dropping schema_migrations table: %v", err)
}
return nil
}

func buildClickhouseMigrateURL(datasource string, cluster string) (string, error) {
// return fmt.Sprintf("clickhouse://localhost:9000?database=default&x-multi-statement=true"), nil
var clickhouseUrl string
Expand Down
2 changes: 2 additions & 0 deletions exporter/clickhousetracesexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Config struct {
Options `mapstructure:",squash"`
Datasource string `mapstructure:"datasource"`
Migrations string `mapstructure:"migrations"`
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"`
}

var _ component.ExporterConfig = (*Config)(nil)
Expand Down
4 changes: 3 additions & 1 deletion exporter/clickhousetracesexporter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type namespaceConfig struct {
DependencyGraphDbMV string
DependencyGraphMessagingMV string
DependencyGraphTable string
DockerMultiNodeCluster bool
WriteBatchDelay time.Duration
WriteBatchSize int
Encoding Encoding
Expand Down Expand Up @@ -125,7 +126,7 @@ type Options struct {
}

// NewOptions creates a new Options struct.
func NewOptions(migrations string, datasource string, primaryNamespace string, otherNamespaces ...string) *Options {
func NewOptions(migrations string, datasource string, dockerMultiNodeCluster bool, primaryNamespace string, otherNamespaces ...string) *Options {

if datasource == "" {
datasource = defaultDatasource
Expand Down Expand Up @@ -153,6 +154,7 @@ func NewOptions(migrations string, datasource string, primaryNamespace string, o
DependencyGraphServiceMV: defaultDependencyGraphServiceMV,
DependencyGraphDbMV: defaultDependencyGraphDbMV,
DependencyGraphMessagingMV: DependencyGraphMessagingMV,
DockerMultiNodeCluster: dockerMultiNodeCluster,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Expand Down

0 comments on commit 5cbc85e

Please sign in to comment.