Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions docs/design/elasticsearch-data-streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Elasticsearch/OpenSearch Data Streams in Jaeger

This document describes the implementation of Elasticsearch and OpenSearch Data Streams in Jaeger, focusing on the motivation, design decisions, and migration strategy.

## Motivation
Elasticsearch/OpenSearch Data Streams provide a more efficient and manageable way to handle time-series data compared to traditional indices. Benefits include:
- **Automatic Rollover**: Data streams automatically manage the creation of new backing indices based on size or age.
- **Simplified Indexing**: Clients write to a single data stream name instead of multiple date-based indices.
- **Better Performance**: Optimized for append-only time-series data.

## Implementation Details

### Data Stream Naming Convention
To ensure consistency and isolation, we use dot-notation for data streams, which is a common convention in the Elastic ecosystem.
The default names are:
- `jaeger.span`: Spans data stream.
- `jaeger.service`: Service/Operation data stream.
- `jaeger.sampling`: Sampling probabilities data stream.
- `jaeger.dependencies`: Dependency data stream.

This naming strategy avoids conflicts with legacy indices (which typically use `jaeger-span-*` patterns) and adheres to modern practices.

### Index Templates
Data stream-specific index templates are provided in `internal/storage/v1/elasticsearch/mappings/`:
- `jaeger.span-8.json` (merged with standard template)
- `jaeger.service-8.json`
- `jaeger.sampling-8.json`
- `jaeger.dependencies-8.json`

These templates include the `data_stream: {}` field and specify a default ingest pipeline for timestamp normalization (e.g., `jaeger-ds-span-timestamp`).

### Storage Logic
The storage stores (Span, Sampling, Dependency) are updated to support a new configuration flag:
- `UseDataStream`: When set to `true`, Jaeger writes to the data stream alias (e.g., `jaeger.span`) instead of date-based indices.

## Lifecycle Management (ILM vs. ISM)
To manage the lifecycle of data streams effectively, we support both Elasticsearch ILM and OpenSearch ISM.

### Elasticsearch: ILM (Index Lifecycle Management)
When `UseILM=true` (and `UseDataStream=true`), Jaeger applies an ILM policy (default: `jaeger-ilm-policy`) that manages rollover and retention.

### OpenSearch: ISM (Index State Management)
When `UseISM=true` (and `UseDataStream=true`), Jaeger applies an ISM policy (default: `jaeger-ism-policy`) compatible with OpenSearch.

**Note**: `UseILM` and `UseISM` are mutually exclusive.

### Default Policy Definition
The default policy (for both ILM and ISM) targets optimized storage costs:
1. **Hot Phase**: Rollover at 50GB or 200M docs.
2. **Delete Phase**: Delete indices 7 days after rollover.

## Configuration
The following configuration options control data stream usage:
- `UseDataStream`: Explicit toggle to enable data stream support (default: `false`).
- `UseILM`: Enable Elasticsearch Index Lifecycle Management (default: `false`).
- `UseISM`: Enable OpenSearch Index State Management (default: `false`).
- `IndexPrefix`: Optional prefix for all indices and data streams.
- `jaeger.es.readLegacyWithDataStream`: Feature gate to control dual reading (enabled by default for migration support).

## Migration Strategy
Jaeger supports a seamless migration from traditional indices to data streams:
- **Phase 1**: Enable Data Streams (`UseDataStream=true`). Jaeger will start writing to the new `jaeger.*` data streams.
- **Phase 2**: During read operations, the `jaeger.es.readLegacyWithDataStream` feature gate (enabled by default) ensures Jaeger queries both the new data streams and the legacy indices.
- **Phase 3**: Once legacy indices are aged out and deleted, disable the feature gate to query only data streams.

## Verification & Evidence

### 1. Ingest Pipeline Setup
Data streams require a `@timestamp` field. Jaeger spans use `startTime` (microseconds). An ingest pipeline `jaeger-ds-span-timestamp` is used to copy `startTime` to `@timestamp`.

### 2. Testing Steps
1. **Prerequisites**: Elasticsearch 8.x or OpenSearch 2.x+, and Jaeger built from the data stream support branch.
2. **Configuration**: Set `UseDataStream=true`.
3. **Verify Templates**: Ensure the `jaeger.span` template exists and has `"data_stream": {}`.
```bash
curl -X GET "localhost:9200/_index_template/jaeger.span?pretty"
```
4. **Generate Data**: Use `tracegen` or a sample app like HotROD to send spans.
5. **Verify Data Stream**:
```bash
curl -X GET "localhost:9200/_data_stream/jaeger.span?pretty"
```
2 changes: 1 addition & 1 deletion internal/storage/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type IndexService interface {
Type(typ string) IndexService
Id(id string) IndexService
BodyJson(body any) IndexService
Add()
Add(opType string)
}

// SearchService is an abstraction for elastic.SearchService
Expand Down
24 changes: 22 additions & 2 deletions internal/storage/elasticsearch/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@ type Configuration struct {
// CreateIndexTemplates, if set to true, creates index templates at application startup.
// This configuration should be set to false when templates are installed manually.
CreateIndexTemplates bool `mapstructure:"create_mappings"`
// Option to enable Index Lifecycle Management (ILM) for Jaeger span and service indices.
// Read more about ILM at
// https://www.jaegertracing.io/docs/deployment/#enabling-ilm-support
UseILM bool `mapstructure:"use_ilm"`

// Option to enable Index State Management (ISM) for Jaeger span and service indices (OpenSearch).
UseISM bool `mapstructure:"use_ism"`

// UseDataStream, if set to true, enables the data stream support (ES 8+ or OpenSearch potentially).
UseDataStream bool `mapstructure:"use_data_stream"`

// ---- jaeger-specific configs ----
// MaxDocCount Defines maximum number of results to fetch from storage per query.
MaxDocCount int `mapstructure:"max_doc_count"`
Expand All @@ -180,6 +184,16 @@ type Configuration struct {
// latest adaptive sampling probabilities.
AdaptiveSamplingLookback time.Duration `mapstructure:"adaptive_sampling_lookback"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// EnableLogsDB, if set to true, will enable the logsdb mode in Elasticsearch indices (ES 8.17+).
// This mode provides significant disk space savings for telemetry data.
EnableLogsDB bool `mapstructure:"enabled_logsdb"`

// EnableIngestPipeline enables the default ingest pipeline setting in index templates.
EnableIngestPipeline bool `mapstructure:"enabled_ingest_pipeline"`
// IngestPipelineName specifies the name of the ingest pipeline to be used.
// This is only applicable if EnableIngestPipeline is true.
IngestPipelineName string `mapstructure:"ingest_pipeline_name"`

// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`
}
Expand Down Expand Up @@ -759,6 +773,12 @@ func (c *Configuration) Validate() error {
if c.CreateIndexTemplates && c.UseILM {
return errors.New("when UseILM is set true, CreateIndexTemplates must be set to false and index templates must be created by init process of es-rollover app")
}
if c.UseISM && !c.UseReadWriteAliases {
return errors.New("UseISM must always be used in conjunction with UseReadWriteAliases to ensure ES writers and readers refer to the single index mapping")
}
if c.CreateIndexTemplates && c.UseISM {
return errors.New("when UseISM is set true, CreateIndexTemplates must be set to false and index templates must be created by init process of es-rollover app")
}

// Validate explicit alias settings require UseReadWriteAliases
hasAnyExplicitAlias := c.SpanReadAlias != "" || c.SpanWriteAlias != "" ||
Expand Down
21 changes: 21 additions & 0 deletions internal/storage/elasticsearch/config/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package config

import (
"strings"
"time"
)

// IndexWithDate returns index name with date
func IndexWithDate(indexPrefix, indexDateLayout string, date time.Time) string {
return indexPrefix + date.UTC().Format(indexDateLayout)
}

// GetDataStreamLegacyWildcard returns the legacy wildcard pattern for a data stream.
// It replaces the first dot with a dash and appends a wildcard.
// Example: jaeger.span -> jaeger-span-*
func GetDataStreamLegacyWildcard(dataStreamName string) string {
return strings.Replace(dataStreamName, ".", "-", 1) + "-*"
}
21 changes: 14 additions & 7 deletions internal/storage/elasticsearch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/storage/elasticsearch/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ func (i IndexServiceWrapper) Type(typ string) es.IndexService {
}

// Add adds the request to bulk service
func (i IndexServiceWrapper) Add() {
func (i IndexServiceWrapper) Add(opType string) {
if opType != "" {
i.bulkIndexReq.OpType(opType)
}
i.bulkService.Add(i.bulkIndexReq)
}

Expand Down
19 changes: 15 additions & 4 deletions internal/storage/v1/elasticsearch/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (f *FactoryBase) GetSpanReaderParams() esspanstore.SpanReaderParams {
TagDotReplacement: f.config.Tags.DotReplacement,
UseReadWriteAliases: f.config.UseReadWriteAliases,
ReadAliasSuffix: f.config.ReadAliasSuffix,
UseDataStream: f.config.UseDataStream,
RemoteReadClusters: f.config.RemoteReadClusters,
SpanReadAlias: f.config.SpanReadAlias,
ServiceReadAlias: f.config.ServiceReadAlias,
Expand All @@ -135,6 +136,7 @@ func (f *FactoryBase) GetSpanWriterParams() esspanstore.SpanWriterParams {
TagKeysAsFields: f.tags,
TagDotReplacement: f.config.Tags.DotReplacement,
UseReadWriteAliases: f.config.UseReadWriteAliases,
UseDataStream: f.config.UseDataStream,
WriteAliasSuffix: f.config.WriteAliasSuffix,
SpanWriteAlias: f.config.SpanWriteAlias,
ServiceWriteAlias: f.config.ServiceWriteAlias,
Expand All @@ -153,6 +155,7 @@ func (f *FactoryBase) GetDependencyStoreParams() esdepstorev2.Params {
IndexDateLayout: f.config.Indices.Dependencies.DateLayout,
MaxDocCount: f.config.MaxDocCount,
UseReadWriteAliases: f.config.UseReadWriteAliases,
UseDataStream: f.config.UseDataStream,
}
}

Expand All @@ -165,6 +168,7 @@ func (f *FactoryBase) CreateSamplingStore(int /* maxBuckets */) (samplingstore.S
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.config.Indices.Sampling.RolloverFrequency),
Lookback: f.config.AdaptiveSamplingLookback,
MaxDocCount: f.config.MaxDocCount,
UseDataStream: f.config.UseDataStream,
}
store := essamplestore.NewSamplingStore(params)

Expand All @@ -182,12 +186,19 @@ func (f *FactoryBase) CreateSamplingStore(int /* maxBuckets */) (samplingstore.S
return store, nil
}

const defaultILMPolicyName = "jaeger-ilm-policy"

func (f *FactoryBase) mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder {
return mappings.MappingBuilder{
TemplateBuilder: f.templateBuilder,
Indices: cfg.Indices,
EsVersion: cfg.Version,
UseILM: cfg.UseILM,
TemplateBuilder: f.templateBuilder,
Indices: cfg.Indices,
EsVersion: cfg.Version,
UseILM: cfg.UseILM,
UseISM: cfg.UseISM,
ILMPolicyName: cfg.Indices.IndexPrefix.Apply(defaultILMPolicyName),
EnableLogsDB: cfg.EnableLogsDB,
EnableIngestPipeline: cfg.EnableIngestPipeline,
UseDataStream: cfg.UseDataStream,
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"priority": 502,
"index_patterns": [
"test-jaeger-dependencies*",
"test-jaeger.dependencies*"
],
"data_stream": {},
"template": {
"aliases": {
"test-jaeger-dependencies-read": {}
},
"settings": {
"index.number_of_shards": 3,
"index.number_of_replicas": 3,
"index.mapping.nested_fields.limit": 50,
"index.requests.cache.enable": true,
"lifecycle": {
"name": "jaeger-test-policy",
"rollover_alias": "test-jaeger-dependencies-write"
}
},
"mappings": {}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"priority": 502,
"index_patterns": "test-jaeger-dependencies-*",
"index_patterns": [
"test-jaeger-dependencies*"
],
"template": {
"aliases": {
"test-jaeger-dependencies-read": {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"priority": 503,
"index_patterns": [
"test-jaeger-sampling*",
"test-jaeger.sampling*"
],
"data_stream": {},
"template": {
"aliases": {
"test-jaeger-sampling-read": {}
},
"settings": {
"index.number_of_shards": 3,
"index.number_of_replicas": 3,
"index.mapping.nested_fields.limit": 50,
"index.requests.cache.enable": true,
"lifecycle": {
"name": "jaeger-test-policy",
"rollover_alias": "test-jaeger-sampling-write"
}
},
"mappings": {}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{
"priority": 503,
"index_patterns": "test-jaeger-sampling-*",
"index_patterns": [
"test-jaeger-sampling*",
"test-jaeger.sampling*"
],
"data_stream": {},
"template": {
"aliases": {
"test-jaeger-sampling-read": {}
Expand All @@ -17,4 +21,4 @@
},
"mappings": {}
}
}
}
Loading
Loading