From ee680c6ba7cc865dd2b99ffa167b46baffadb553 Mon Sep 17 00:00:00 2001 From: shubham-tomar Date: Mon, 10 Nov 2025 12:49:26 +0530 Subject: [PATCH 1/2] data reader --- cmd/nebula/main.go | 1 + docs/iceberg-source-lld.md | 1128 +++++++++++++++++ examples/configs/iceberg-destination.json | 2 +- examples/configs/iceberg-source.json | 30 + .../sources/iceberg/data_file_reader.go | 228 ++++ .../sources/iceberg/iceberg_source.go | 554 ++++++++ pkg/connector/sources/iceberg/init.go | 85 ++ .../sources/iceberg/manifest_reader.go | 81 ++ .../sources/iceberg/nessie_catalog.go | 230 ++++ .../sources/iceberg/snapshot_manager.go | 58 + pkg/connector/sources/iceberg/types.go | 105 ++ 11 files changed, 2501 insertions(+), 1 deletion(-) create mode 100644 docs/iceberg-source-lld.md create mode 100644 examples/configs/iceberg-source.json create mode 100644 pkg/connector/sources/iceberg/data_file_reader.go create mode 100644 pkg/connector/sources/iceberg/iceberg_source.go create mode 100644 pkg/connector/sources/iceberg/init.go create mode 100644 pkg/connector/sources/iceberg/manifest_reader.go create mode 100644 pkg/connector/sources/iceberg/nessie_catalog.go create mode 100644 pkg/connector/sources/iceberg/snapshot_manager.go create mode 100644 pkg/connector/sources/iceberg/types.go diff --git a/cmd/nebula/main.go b/cmd/nebula/main.go index 4d2ecd4..2759f53 100644 --- a/cmd/nebula/main.go +++ b/cmd/nebula/main.go @@ -23,6 +23,7 @@ import ( _ "github.com/ajitpratap0/nebula/pkg/connector/destinations/json" _ "github.com/ajitpratap0/nebula/pkg/connector/sources/csv" _ "github.com/ajitpratap0/nebula/pkg/connector/sources/google_ads" + _ "github.com/ajitpratap0/nebula/pkg/connector/sources/iceberg" _ "github.com/ajitpratap0/nebula/pkg/connector/sources/json" ) diff --git a/docs/iceberg-source-lld.md b/docs/iceberg-source-lld.md new file mode 100644 index 0000000..a3919f9 --- /dev/null +++ b/docs/iceberg-source-lld.md @@ -0,0 +1,1128 @@ +# Low-Level Design: Iceberg Source Connector + +## 1. Overview + +### 1.1 Purpose +This document describes the low-level design for implementing an Apache Iceberg source connector in Nebula. The connector will enable reading data from Iceberg tables through various catalog implementations (Nessie, REST) with high performance and memory efficiency. + +### 1.2 Goals +- Read data from Iceberg tables efficiently with support for batch and streaming modes +- Support multiple catalog types (Nessie, REST, AWS Glue, Hive) +- Implement incremental reads using Iceberg snapshots and manifests +- Maintain Nebula's performance characteristics (1.7M-3.6M records/sec) +- Support schema evolution and partition pruning +- Enable predicate pushdown for efficient data filtering +- Provide CDC-like capabilities using Iceberg's snapshot isolation + +### 1.3 Non-Goals +- Writing to Iceberg tables (already implemented as destination) +- Schema migration or DDL operations +- Custom file format support beyond Parquet/ORC/Avro + +## 2. Architecture + +### 2.1 Component Diagram + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ IcebergSource │ +│ (implements core.Source, embeds BaseConnector) │ +├─────────────────────────────────────────────────────────────────┤ +│ - Configuration & Initialization │ +│ - Schema Discovery │ +│ - Read Operations (Stream/Batch) │ +│ - State Management (Snapshot tracking) │ +│ - Health & Metrics │ +└──────────────┬──────────────────────────────────────────────────┘ + │ + ├─────────────► CatalogProvider Interface + │ │ + │ ├─► NessieCatalog + │ ├─► RestCatalog + │ ├─► GlueCatalog (future) + │ └─► HiveCatalog (future) + │ + ├─────────────► SnapshotManager + │ - Snapshot discovery + │ - Incremental reads + │ - Position tracking + │ + ├─────────────► ManifestReader + │ - Manifest file reading + │ - Data file discovery + │ - Partition pruning + │ + └─────────────► DataFileReader + - Parquet file reading + - ORC file reading (future) + - Avro file reading (future) + - Arrow conversion +``` + +### 2.2 Package Structure + +``` +pkg/connector/sources/iceberg/ +├── iceberg_source.go # Main source implementation +├── catalog_provider.go # Catalog interface (shared with destination) +├── snapshot_manager.go # Snapshot and incremental read logic +├── manifest_reader.go # Manifest file processing +├── data_file_reader.go # Data file reading and conversion +├── schema_converter.go # Iceberg to Core schema conversion +├── predicate_pushdown.go # Filter predicate optimization +├── partition_filter.go # Partition pruning logic +├── arrow_converter.go # Arrow to Record conversion +├── types.go # Type definitions +├── config.go # Configuration structures +└── init.go # Registration +``` + +## 3. Detailed Design + +### 3.1 Core Data Structures + +#### 3.1.1 IcebergSource + +```go +type IcebergSource struct { + *baseconnector.BaseConnector + + // Configuration + catalogProvider CatalogProvider + catalogType string + catalogURI string + database string + tableName string + branch string // For Nessie + + // Storage configuration (S3/MinIO) + region string + s3Endpoint string + accessKey string + secretKey string + properties map[string]string + + // Iceberg table metadata + table icebergGo.Table + schema *core.Schema + icebergSchema *icebergGo.Schema + + // Snapshot management + snapshotManager *SnapshotManager + currentSnapshot *icebergGo.Snapshot + startSnapshot int64 // For incremental reads + endSnapshot int64 // For incremental reads + + // Data reading + manifestReader *ManifestReader + dataFileReader *DataFileReader + + // State tracking + position *IcebergPosition + recordsRead int64 + bytesRead int64 + filesRead int64 + isInitialized bool + + // Filtering + predicates []Predicate + partitionFilter *PartitionFilter + + // Synchronization + mu sync.RWMutex + + // Performance + readBatchSize int + prefetchFiles int + useArrowReader bool +} + +type IcebergPosition struct { + SnapshotID int64 `json:"snapshot_id"` + ManifestIndex int `json:"manifest_index"` + DataFileIndex int `json:"data_file_index"` + RowOffset int64 `json:"row_offset"` + Metadata map[string]interface{} `json:"metadata"` +} + +func (p *IcebergPosition) String() string { + return fmt.Sprintf("iceberg_snapshot_%d_manifest_%d_file_%d_offset_%d", + p.SnapshotID, p.ManifestIndex, p.DataFileIndex, p.RowOffset) +} + +func (p *IcebergPosition) Compare(other core.Position) int { + // Implementation for position comparison +} +``` + +#### 3.1.2 SnapshotManager + +```go +type SnapshotManager struct { + table icebergGo.Table + currentSnapshot *icebergGo.Snapshot + snapshots []*icebergGo.Snapshot + logger *zap.Logger +} + +// GetCurrentSnapshot returns the current snapshot +func (sm *SnapshotManager) GetCurrentSnapshot() (*icebergGo.Snapshot, error) + +// GetSnapshotsSince returns snapshots after a given snapshot ID +func (sm *SnapshotManager) GetSnapshotsSince(snapshotID int64) ([]*icebergGo.Snapshot, error) + +// GetSnapshotsRange returns snapshots between start and end +func (sm *SnapshotManager) GetSnapshotsRange(start, end int64) ([]*icebergGo.Snapshot, error) + +// ListSnapshots returns all available snapshots +func (sm *SnapshotManager) ListSnapshots() ([]*icebergGo.Snapshot, error) +``` + +#### 3.1.3 ManifestReader + +```go +type ManifestReader struct { + snapshot *icebergGo.Snapshot + manifests []icebergGo.ManifestFile + currentIndex int + dataFiles []icebergGo.DataFile + partitionFilter *PartitionFilter + logger *zap.Logger +} + +// GetDataFiles returns all data files from manifests +func (mr *ManifestReader) GetDataFiles(ctx context.Context) ([]icebergGo.DataFile, error) + +// GetDataFilesFiltered returns filtered data files +func (mr *ManifestReader) GetDataFilesFiltered( + ctx context.Context, + filter *PartitionFilter, +) ([]icebergGo.DataFile, error) + +// GetManifestCount returns the number of manifests +func (mr *ManifestReader) GetManifestCount() int +``` + +#### 3.1.4 DataFileReader + +```go +type DataFileReader struct { + dataFiles []icebergGo.DataFile + currentIndex int + schema *arrow.Schema + batchSize int + s3Config S3Config + useArrowReader bool + recordPool *RecordPool + logger *zap.Logger +} + +type S3Config struct { + Region string + Endpoint string + AccessKey string + SecretKey string + Properties map[string]string +} + +// ReadFile reads a single data file and returns records +func (dfr *DataFileReader) ReadFile( + ctx context.Context, + file icebergGo.DataFile, +) ([]*pool.Record, error) + +// ReadFileStream streams records from a file +func (dfr *DataFileReader) ReadFileStream( + ctx context.Context, + file icebergGo.DataFile, + recordChan chan<- *pool.Record, + errorChan chan<- error, +) + +// ReadParquet reads a Parquet file +func (dfr *DataFileReader) ReadParquet( + ctx context.Context, + filePath string, +) ([]*pool.Record, error) + +// ConvertArrowToRecords converts Arrow records to Nebula records +func (dfr *DataFileReader) ConvertArrowToRecords( + arrowRecord arrow.Record, +) ([]*pool.Record, error) +``` + +#### 3.1.5 Configuration + +```go +type IcebergSourceConfig struct { + // Catalog configuration + CatalogType string `json:"catalog_type"` // nessie, rest, glue, hive + CatalogURI string `json:"catalog_uri"` + CatalogName string `json:"catalog_name"` + Warehouse string `json:"warehouse"` + Database string `json:"database"` + Table string `json:"table"` + Branch string `json:"branch"` // For Nessie + + // S3/MinIO configuration + Region string `json:"region"` + S3Endpoint string `json:"s3_endpoint"` + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key"` + + // Read options + SnapshotID *int64 `json:"snapshot_id"` // Specific snapshot + StartSnapshot *int64 `json:"start_snapshot"` // For incremental + EndSnapshot *int64 `json:"end_snapshot"` // For incremental + AsOfTimestamp *time.Time `json:"as_of_timestamp"` // Time travel + + // Filtering + Predicates []string `json:"predicates"` // SQL-like filters + PartitionFilters map[string]string `json:"partition_filters"` // Partition pruning + + // Performance tuning + BatchSize int `json:"batch_size"` + PrefetchFiles int `json:"prefetch_files"` + UseArrowReader bool `json:"use_arrow_reader"` + MaxConcurrency int `json:"max_concurrency"` + + // Additional properties + Properties map[string]string `json:"properties"` +} +``` + +### 3.2 Core Operations + +#### 3.2.1 Initialize + +```go +func (s *IcebergSource) Initialize(ctx context.Context, config *config.BaseConfig) error { + // 1. Initialize base connector + if err := s.BaseConnector.Initialize(ctx, config); err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeConfig, + "failed to initialize base connector") + } + + // 2. Parse Iceberg-specific configuration + if err := s.parseConfig(config); err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeConfig, + "failed to parse Iceberg config") + } + + // 3. Create catalog provider + catalogProvider, err := NewCatalogProvider(s.catalogType, s.GetLogger()) + if err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeConfig, + "failed to create catalog provider") + } + s.catalogProvider = catalogProvider + + // 4. Connect to catalog with circuit breaker + if err := s.ExecuteWithCircuitBreaker(func() error { + return s.connectToCatalog(ctx) + }); err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeConnection, + "failed to connect to catalog") + } + + // 5. Load table metadata + if err := s.loadTable(ctx); err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeData, + "failed to load table") + } + + // 6. Initialize snapshot manager + s.snapshotManager = NewSnapshotManager(s.table, s.GetLogger()) + + // 7. Discover schema + if err := s.discoverSchema(ctx); err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeData, + "failed to discover schema") + } + + // 8. Initialize manifest and data file readers + if err := s.initializeReaders(ctx); err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeData, + "failed to initialize readers") + } + + // 9. Update health status + s.UpdateHealth(true, map[string]interface{}{ + "table": s.tableName, + "database": s.database, + "snapshot_id": s.currentSnapshot.SnapshotID, + "schema_version": s.icebergSchema.ID, + }) + + s.isInitialized = true + s.GetLogger().Info("Iceberg source initialized", + zap.String("table", fmt.Sprintf("%s.%s", s.database, s.tableName)), + zap.Int64("snapshot_id", s.currentSnapshot.SnapshotID)) + + return nil +} +``` + +#### 3.2.2 Discover + +```go +func (s *IcebergSource) Discover(ctx context.Context) (*core.Schema, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.schema == nil { + return nil, nebulaerrors.New(nebulaerrors.ErrorTypeData, + "schema not discovered yet") + } + + return s.schema, nil +} + +func (s *IcebergSource) discoverSchema(ctx context.Context) error { + // Get Iceberg schema from table + s.icebergSchema = s.table.Schema() + + // Convert to core schema + fields := make([]core.Field, 0, len(s.icebergSchema.Fields())) + for _, icebergField := range s.icebergSchema.Fields() { + coreField := s.convertIcebergFieldToCore(icebergField) + fields = append(fields, coreField) + } + + s.schema = &core.Schema{ + Name: s.tableName, + Description: fmt.Sprintf("Iceberg table %s.%s", s.database, s.tableName), + Fields: fields, + Version: s.icebergSchema.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + s.GetLogger().Info("Discovered Iceberg table schema", + zap.String("table", s.tableName), + zap.Int("schema_id", s.icebergSchema.ID), + zap.Int("field_count", len(fields))) + + return nil +} +``` + +#### 3.2.3 Read Operations + +##### ReadBatch (Primary Implementation) + +```go +func (s *IcebergSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error) { + s.mu.RLock() + if !s.isInitialized { + s.mu.RUnlock() + return nil, nebulaerrors.New(nebulaerrors.ErrorTypeValidation, + "source not initialized") + } + s.mu.RUnlock() + + batchChan := pool.GetBatchChannel() + errorChan := make(chan error, 1) + + // Use optimized batch size + optimizedBatchSize := s.OptimizeBatchSize() + if batchSize > 0 && batchSize < optimizedBatchSize { + optimizedBatchSize = batchSize + } + + go func() { + defer close(batchChan) + defer close(errorChan) + defer pool.PutBatchChannel(batchChan) + + if err := s.streamBatches(ctx, optimizedBatchSize, batchChan, errorChan); err != nil { + errorChan <- err + } + }() + + return &core.BatchStream{ + Batches: batchChan, + Errors: errorChan, + }, nil +} + +func (s *IcebergSource) streamBatches( + ctx context.Context, + batchSize int, + batchChan chan<- []*pool.Record, + errorChan chan<- error, +) error { + // Get data files from manifests + dataFiles, err := s.manifestReader.GetDataFilesFiltered(ctx, s.partitionFilter) + if err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeData, + "failed to get data files") + } + + s.GetLogger().Info("Starting to read data files", + zap.Int("file_count", len(dataFiles))) + + batch := pool.GetBatchSlice(batchSize) + defer pool.PutBatchSlice(batch) + + // Process each data file + for fileIdx, dataFile := range dataFiles { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Update position + s.mu.Lock() + s.position.DataFileIndex = fileIdx + s.mu.Unlock() + + // Apply rate limiting + if err := s.RateLimit(ctx); err != nil { + return err + } + + // Read file with circuit breaker + var fileRecords []*pool.Record + if err := s.ExecuteWithCircuitBreaker(func() error { + var readErr error + fileRecords, readErr = s.dataFileReader.ReadFile(ctx, dataFile) + return readErr + }); err != nil { + if handleErr := s.HandleError(ctx, err, nil); handleErr != nil { + return handleErr + } + continue + } + + // Add records to batch + for _, record := range fileRecords { + batch = append(batch, record) + + s.mu.Lock() + s.recordsRead++ + s.position.RowOffset++ + s.mu.Unlock() + + // Send batch when full + if len(batch) >= batchSize { + s.RecordMetric("batches_read", 1, core.MetricTypeCounter) + s.RecordMetric("records_in_batch", len(batch), core.MetricTypeGauge) + + select { + case batchChan <- batch: + batch = pool.GetBatchSlice(batchSize) + case <-ctx.Done(): + return ctx.Err() + } + } + } + + s.mu.Lock() + s.filesRead++ + s.mu.Unlock() + + s.RecordMetric("files_read", 1, core.MetricTypeCounter) + } + + // Send remaining batch + if len(batch) > 0 { + select { + case batchChan <- batch: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} +``` + +##### Read (Streaming) + +```go +func (s *IcebergSource) Read(ctx context.Context) (*core.RecordStream, error) { + s.mu.RLock() + if !s.isInitialized { + s.mu.RUnlock() + return nil, nebulaerrors.New(nebulaerrors.ErrorTypeValidation, + "source not initialized") + } + s.mu.RUnlock() + + recordChan := pool.GetRecordChannel(s.GetConfig().Performance.BufferSize) + errorChan := make(chan error, 1) + + go func() { + defer close(recordChan) + defer close(errorChan) + defer pool.PutRecordChannel(recordChan) + + if err := s.streamRecords(ctx, recordChan, errorChan); err != nil { + errorChan <- err + } + }() + + return &core.RecordStream{ + Records: recordChan, + Errors: errorChan, + }, nil +} +``` + +#### 3.2.4 State Management + +```go +func (s *IcebergSource) GetPosition() core.Position { + s.mu.RLock() + defer s.mu.RUnlock() + + return &IcebergPosition{ + SnapshotID: s.currentSnapshot.SnapshotID, + ManifestIndex: s.manifestReader.currentIndex, + DataFileIndex: s.position.DataFileIndex, + RowOffset: s.position.RowOffset, + Metadata: map[string]interface{}{ + "records_read": s.recordsRead, + "files_read": s.filesRead, + }, + } +} + +func (s *IcebergSource) SetPosition(position core.Position) error { + icebergPos, ok := position.(*IcebergPosition) + if !ok { + return nebulaerrors.New(nebulaerrors.ErrorTypeValidation, + "invalid position type") + } + + s.mu.Lock() + defer s.mu.Unlock() + + // Update current position + s.position = icebergPos + + // Seek to snapshot if different + if s.currentSnapshot.SnapshotID != icebergPos.SnapshotID { + snapshot, err := s.snapshotManager.GetSnapshotByID(icebergPos.SnapshotID) + if err != nil { + return nebulaerrors.Wrap(err, nebulaerrors.ErrorTypeData, + "failed to find snapshot") + } + s.currentSnapshot = snapshot + } + + return nil +} + +func (s *IcebergSource) GetState() core.State { + s.mu.RLock() + defer s.mu.RUnlock() + + return core.State{ + "snapshot_id": s.currentSnapshot.SnapshotID, + "manifest_index": s.manifestReader.currentIndex, + "data_file_index": s.position.DataFileIndex, + "row_offset": s.position.RowOffset, + "records_read": s.recordsRead, + "files_read": s.filesRead, + "bytes_read": s.bytesRead, + } +} + +func (s *IcebergSource) SetState(state core.State) error { + s.mu.Lock() + defer s.mu.Unlock() + + if snapshotID, ok := state["snapshot_id"].(int64); ok { + snapshot, err := s.snapshotManager.GetSnapshotByID(snapshotID) + if err != nil { + return err + } + s.currentSnapshot = snapshot + } + + if manifestIdx, ok := state["manifest_index"].(int); ok { + s.manifestReader.currentIndex = manifestIdx + } + + if dataFileIdx, ok := state["data_file_index"].(int); ok { + s.position.DataFileIndex = dataFileIdx + } + + if rowOffset, ok := state["row_offset"].(int64); ok { + s.position.RowOffset = rowOffset + } + + if recordsRead, ok := state["records_read"].(int64); ok { + s.recordsRead = recordsRead + } + + if filesRead, ok := state["files_read"].(int64); ok { + s.filesRead = filesRead + } + + if bytesRead, ok := state["bytes_read"].(int64); ok { + s.bytesRead = bytesRead + } + + return nil +} +``` + +#### 3.2.5 Capabilities + +```go +func (s *IcebergSource) SupportsIncremental() bool { + return true // Iceberg supports incremental reads via snapshots +} + +func (s *IcebergSource) SupportsRealtime() bool { + return false // Not real-time, but supports incremental +} + +func (s *IcebergSource) SupportsBatch() bool { + return true +} + +func (s *IcebergSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error) { + return nil, nebulaerrors.New(nebulaerrors.ErrorTypeCapability, + "Iceberg source does not support real-time subscriptions") +} +``` + +### 3.3 Advanced Features + +#### 3.3.1 Incremental Reads + +```go +type IncrementalReadConfig struct { + StartSnapshotID int64 + EndSnapshotID int64 + Mode IncrementalMode +} + +type IncrementalMode string + +const ( + IncrementalModeAppend IncrementalMode = "append" // Only new data + IncrementalModeSnapshot IncrementalMode = "snapshot" // Full snapshot diff + IncrementalModeChangelog IncrementalMode = "changelog" // With delete detection +) + +func (s *IcebergSource) ReadIncremental( + ctx context.Context, + config IncrementalReadConfig, +) (*core.BatchStream, error) { + // Get snapshots between start and end + snapshots, err := s.snapshotManager.GetSnapshotsRange( + config.StartSnapshotID, + config.EndSnapshotID, + ) + if err != nil { + return nil, err + } + + // Process each snapshot incrementally + // ... implementation +} +``` + +#### 3.3.2 Partition Pruning + +```go +type PartitionFilter struct { + filters map[string]interface{} +} + +func (pf *PartitionFilter) ApplyToDataFiles( + dataFiles []icebergGo.DataFile, +) []icebergGo.DataFile { + filtered := make([]icebergGo.DataFile, 0) + + for _, file := range dataFiles { + if pf.matchesPartition(file.Partition()) { + filtered = append(filtered, file) + } + } + + return filtered +} + +func (pf *PartitionFilter) matchesPartition( + partition map[string]interface{}, +) bool { + for key, filterValue := range pf.filters { + partValue, exists := partition[key] + if !exists || partValue != filterValue { + return false + } + } + return true +} +``` + +#### 3.3.3 Predicate Pushdown + +```go +type Predicate struct { + Field string + Operator PredicateOp + Value interface{} +} + +type PredicateOp string + +const ( + OpEqual PredicateOp = "=" + OpNotEqual PredicateOp = "!=" + OpGreaterThan PredicateOp = ">" + OpGreaterThanOrEqual PredicateOp = ">=" + OpLessThan PredicateOp = "<" + OpLessThanOrEqual PredicateOp = "<=" + OpIn PredicateOp = "IN" + OpNotIn PredicateOp = "NOT IN" +) + +func (s *IcebergSource) ApplyPredicates( + predicates []Predicate, +) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.predicates = predicates + + // Convert to Iceberg expressions for file-level filtering + // ... implementation + + return nil +} +``` + +## 4. Configuration + +### 4.1 JSON Configuration Example + +```json +{ + "connector_id": "iceberg_source_01", + "source_type": "iceberg", + "name": "iceberg_source", + "storage_mode": "columnar", + "security": { + "credentials": { + "catalog_type": "nessie", + "catalog_uri": "http://localhost:19120/api/v1", + "catalog_name": "nessie", + "warehouse": "s3://warehouse/", + "database": "analytics", + "table": "events", + "branch": "main", + "region": "us-east-1", + "s3_endpoint": "http://localhost:9000", + "access_key": "minioadmin", + "secret_key": "minioadmin", + "snapshot_id": "7893247829347823", + "start_snapshot": "7893247829347820", + "partition_filters": { + "year": "2024", + "month": "11" + } + } + }, + "performance": { + "batch_size": 10000, + "workers": 4, + "buffer_size": 1000, + "max_concurrency": 8, + "prefetch_files": 2 + }, + "timeouts": { + "connection": "30s", + "read": "5m", + "idle": "10m" + } +} +``` + +### 4.2 CLI Usage Example + +```bash +# Full table read +go run cmd/nebula/main.go run \ + --source examples/configs/iceberg-source.json \ + --destination examples/configs/bigquery-destination.json \ + --batch-size 10000 \ + --log-level info + +# Incremental read from specific snapshot +go run cmd/nebula/main.go run \ + --source examples/configs/iceberg-source-incremental.json \ + --destination examples/configs/snowflake-destination.json \ + --batch-size 10000 +``` + +## 5. Performance Considerations + +### 5.1 Memory Optimization + +- **Arrow Integration**: Use Arrow for zero-copy data transfer from Parquet files +- **Record Pooling**: Leverage Nebula's record pool for memory efficiency +- **Streaming**: Stream data files instead of loading all into memory +- **Batch Processing**: Process files in configurable batches + +### 5.2 I/O Optimization + +- **File Prefetching**: Prefetch next N data files while processing current file +- **Parallel File Reading**: Read multiple data files concurrently +- **S3 Optimizations**: Use byte-range requests for Parquet footer reading +- **Connection Pooling**: Reuse S3 connections across file reads + +### 5.3 Partition Pruning + +- **Manifest-Level Filtering**: Filter manifests based on partition bounds +- **File-Level Filtering**: Skip data files that don't match partition filters +- **Statistics-Based Filtering**: Use Iceberg column statistics for pruning + +### 5.4 Expected Performance + +Based on Nebula's architecture: +- **Target Throughput**: 1M-2M records/sec for Parquet files +- **Memory Usage**: <100 bytes per record +- **Latency**: <100ms to first record +- **Scalability**: Linear with number of data files + +## 6. Testing Strategy + +### 6.1 Unit Tests + +```go +// Test files in pkg/connector/sources/iceberg/ + +// iceberg_source_test.go +func TestIcebergSource_Initialize(t *testing.T) +func TestIcebergSource_Discover(t *testing.T) +func TestIcebergSource_ReadBatch(t *testing.T) +func TestIcebergSource_Read(t *testing.T) +func TestIcebergSource_StateManagement(t *testing.T) + +// snapshot_manager_test.go +func TestSnapshotManager_GetCurrentSnapshot(t *testing.T) +func TestSnapshotManager_GetSnapshotsSince(t *testing.T) + +// manifest_reader_test.go +func TestManifestReader_GetDataFiles(t *testing.T) +func TestManifestReader_PartitionPruning(t *testing.T) + +// data_file_reader_test.go +func TestDataFileReader_ReadParquet(t *testing.T) +func TestDataFileReader_ArrowConversion(t *testing.T) +``` + +### 6.2 Integration Tests + +```go +// tests/integration/iceberg_source_test.go + +func TestIcebergSource_E2E_Nessie(t *testing.T) +func TestIcebergSource_E2E_IncrementalRead(t *testing.T) +func TestIcebergSource_E2E_PartitionedTable(t *testing.T) +func TestIcebergSource_E2E_SchemaEvolution(t *testing.T) +func TestIcebergSource_E2E_LargeTable(t *testing.T) +``` + +### 6.3 Benchmark Tests + +```go +// tests/benchmarks/iceberg_source_bench_test.go + +func BenchmarkIcebergSource_ReadBatch(b *testing.B) +func BenchmarkIcebergSource_ParquetConversion(b *testing.B) +func BenchmarkIcebergSource_PartitionPruning(b *testing.B) +func BenchmarkIcebergSource_SnapshotIteration(b *testing.B) +``` + +## 7. Error Handling + +### 7.1 Error Types + +```go +const ( + ErrorCatalogConnection = "catalog_connection_error" + ErrorTableNotFound = "table_not_found" + ErrorSnapshotNotFound = "snapshot_not_found" + ErrorManifestRead = "manifest_read_error" + ErrorDataFileRead = "data_file_read_error" + ErrorSchemaConversion = "schema_conversion_error" + ErrorPartitionFilter = "partition_filter_error" +) +``` + +### 7.2 Recovery Strategies + +- **Connection Errors**: Retry with exponential backoff (via BaseConnector) +- **File Read Errors**: Skip corrupted files and log error +- **Schema Mismatch**: Attempt schema evolution handling +- **Snapshot Errors**: Fall back to latest snapshot + +## 8. Monitoring and Metrics + +### 8.1 Key Metrics + +```go +func (s *IcebergSource) Metrics() map[string]interface{} { + return map[string]interface{}{ + "records_read": s.recordsRead, + "bytes_read": s.bytesRead, + "files_read": s.filesRead, + "manifests_processed": s.manifestReader.currentIndex, + "current_snapshot_id": s.currentSnapshot.SnapshotID, + "table": fmt.Sprintf("%s.%s", s.database, s.tableName), + "catalog_type": s.catalogType, + "read_throughput": s.calculateThroughput(), + "avg_file_size": s.bytesRead / s.filesRead, + } +} +``` + +### 8.2 Health Checks + +```go +func (s *IcebergSource) Health(ctx context.Context) error { + // Check catalog connection + if err := s.catalogProvider.Health(ctx); err != nil { + return err + } + + // Check table accessibility + if _, err := s.table.CurrentSnapshot(); err != nil { + return err + } + + return nil +} +``` + +## 9. Dependencies + +### 9.1 New Dependencies + +```go +require ( + github.com/apache/arrow-go/v18 v18.0.0 + github.com/shubham-tomar/iceberg-go v0.1.0 + github.com/apache/parquet-go v0.4.0 +) +``` + +### 9.2 Shared with Destination + +- Catalog providers (Nessie, REST) +- Schema conversion utilities +- Arrow conversion helpers + +## 10. Implementation Phases + +### Phase 1: Core Reading +- [ ] Basic IcebergSource structure +- [ ] Nessie catalog integration +- [ ] Single snapshot reading +- [ ] Parquet file reading +- [ ] Schema discovery +- [ ] Batch reading + +### Phase 2: State Management +- [ ] Position tracking +- [ ] State persistence +- [ ] Resume from position +- [ ] Progress reporting + +### Phase 3: Advanced Features +- [ ] Incremental reads +- [ ] Partition pruning +- [ ] Predicate pushdown +- [ ] Schema evolution handling + +### Phase 4: Optimization +- [ ] Parallel file reading +- [ ] File prefetching +- [ ] Arrow optimization +- [ ] Memory pooling + +### Phase 5: Additional Catalogs +- [ ] REST catalog +- [ ] AWS Glue catalog +- [ ] Hive catalog + +## 11. Migration Path + +### 11.1 From Existing Destination + +Leverage existing code from `pkg/connector/destinations/iceberg/`: +- Catalog provider interface and implementations +- Schema conversion utilities +- Arrow integration patterns +- Configuration structures + +### 11.2 Code Reuse + +Create shared package: +``` +pkg/connector/iceberg/ +├── catalog/ # Shared catalog providers +├── schema/ # Schema conversion utilities +└── arrow/ # Arrow conversion helpers +``` + +Both source and destination import from shared package. + +## 12. Documentation + +### 12.1 User Documentation +- Configuration guide +- Usage examples +- Best practices +- Troubleshooting guide + +### 12.2 Developer Documentation +- Architecture overview +- Adding new catalog types +- Performance tuning +- Testing guidelines + +## 13. Open Questions + +1. **ORC/Avro Support**: Should we support ORC and Avro formats in Phase 1 or defer? + - **Recommendation**: Defer to Phase 4, focus on Parquet (most common) + +2. **Delete Files**: How to handle Iceberg delete files for CDC-like behavior? + - **Recommendation**: Implement in Phase 3 as part of incremental reads + +3. **Time Travel**: Should we support arbitrary timestamp-based queries? + - **Recommendation**: Yes, add in Phase 1 as it's a key Iceberg feature + +4. **Catalog Caching**: Should we cache catalog metadata? + - **Recommendation**: Yes, implement simple LRU cache in Phase 2 + +5. **Multi-Table Support**: Should we support reading multiple tables in one source? + - **Recommendation**: Defer to future, keep single-table for now + +## 14. Success Criteria + +- ✅ Successfully read Iceberg tables from Nessie catalog +- ✅ Achieve >1M records/sec throughput on typical tables +- ✅ Support partition pruning with significant performance improvement +- ✅ Handle schema evolution gracefully +- ✅ Maintain <100 bytes memory per record +- ✅ Support incremental reads via snapshots +- ✅ Pass all unit and integration tests +- ✅ Complete documentation and examples + +## 15. References + +- [Apache Iceberg Specification](https://iceberg.apache.org/spec/) +- [Iceberg Table Format](https://iceberg.apache.org/docs/latest/spec/) +- [iceberg-go Library](https://github.com/apache/iceberg-go) +- [Apache Arrow Go](https://github.com/apache/arrow-go) +- [Nessie Documentation](https://projectnessie.org/) diff --git a/examples/configs/iceberg-destination.json b/examples/configs/iceberg-destination.json index 068fc4c..9a2a6ae 100644 --- a/examples/configs/iceberg-destination.json +++ b/examples/configs/iceberg-destination.json @@ -26,7 +26,7 @@ "warehouse": "s3://warehouse/", "catalog_name": "nessie", "database": "dummy", - "table": "nyc_taxi", + "table": "nyc_taxi_dst", "branch": "main", "prop_s3.region": "us-east-1", "prop_s3.endpoint": "http://localhost:9000", diff --git a/examples/configs/iceberg-source.json b/examples/configs/iceberg-source.json new file mode 100644 index 0000000..0eba11e --- /dev/null +++ b/examples/configs/iceberg-source.json @@ -0,0 +1,30 @@ +{ + "name": "iceberg_source", + "type": "iceberg", + "security": { + "credentials": { + "catalog_type": "nessie", + "catalog_uri": "http://localhost:19120/api/v1", + "warehouse": "s3://warehouse/", + "catalog_name": "nessie", + "database": "dummy", + "table": "nyc_taxi", + "branch": "main", + "prop_s3.region": "us-east-1", + "prop_s3.endpoint": "http://localhost:9000", + "prop_s3.access-key-id": "admin", + "prop_s3.secret-access-key": "password", + "prop_s3.path-style-access": "true" + } + }, + "performance": { + "batch_size": 10000, + "workers": 4, + "buffer_size": 1000, + "max_concurrency": 8 + }, + "timeouts": { + "request": 30000000000, + "connection": 10000000000 + } +} diff --git a/pkg/connector/sources/iceberg/data_file_reader.go b/pkg/connector/sources/iceberg/data_file_reader.go new file mode 100644 index 0000000..c3bdb0f --- /dev/null +++ b/pkg/connector/sources/iceberg/data_file_reader.go @@ -0,0 +1,228 @@ +package iceberg + +import ( + "context" + "fmt" + "time" + + "github.com/ajitpratap0/nebula/pkg/models" + "github.com/ajitpratap0/nebula/pkg/pool" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/shubham-tomar/iceberg-go/table" + "go.uber.org/zap" +) + +// DataFileReader reads Parquet data files from Iceberg tables using table scan +type DataFileReader struct { + table *table.Table + batchSize int + logger *zap.Logger +} + +// NewDataFileReader creates a new data file reader +func NewDataFileReader(tbl *table.Table, batchSize int, logger *zap.Logger) *DataFileReader { + logger.Info("Created data file reader using table scan") + return &DataFileReader{ + table: tbl, + batchSize: batchSize, + logger: logger, + } +} + +// ReadAllRecords reads all records from the table using scan +func (dfr *DataFileReader) ReadAllRecords(ctx context.Context) ([]*pool.Record, error) { + dfr.logger.Info("Starting table scan to read all records") + + // Use table scan with limit if batch size is set + scanOptions := []table.ScanOption{} + if dfr.batchSize > 0 { + scanOptions = append(scanOptions, table.WithLimit(int64(dfr.batchSize))) + } + + // Perform scan and get Arrow records + schema, recordsIter, err := dfr.table.Scan(scanOptions...).ToArrowRecords(ctx) + if err != nil { + return nil, fmt.Errorf("failed to scan table: %w", err) + } + + dfr.logger.Info("Table scan started", zap.Int("schema_fields", len(schema.Fields()))) + + // Process all records from iterator + var allRecords []*pool.Record + recordCount := 0 + + for record, err := range recordsIter { + if err != nil { + dfr.logger.Error("Error reading record from iterator", zap.Error(err)) + continue + } + if record == nil { + continue + } + + recordCount++ + dfr.logger.Debug("Processing Arrow record", + zap.Int("record_num", recordCount), + zap.Int64("num_rows", record.NumRows())) + + // Convert Arrow record to Nebula records + nebRecords, err := dfr.convertArrowRecordToRecords(record, schema) + if err != nil { + dfr.logger.Error("Failed to convert Arrow record", zap.Error(err)) + record.Release() + continue + } + + allRecords = append(allRecords, nebRecords...) + record.Release() + } + + dfr.logger.Info("Table scan completed", + zap.Int("total_records", len(allRecords)), + zap.Int("arrow_batches", recordCount)) + + return allRecords, nil +} + +// convertArrowRecordToRecords converts an Arrow record to Nebula records +func (dfr *DataFileReader) convertArrowRecordToRecords(record arrow.Record, schema *arrow.Schema) ([]*pool.Record, error) { + numRows := int(record.NumRows()) + records := make([]*pool.Record, 0, numRows) + + // Convert each row to a Nebula record + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + nebRecord := models.NewRecordFromPool("iceberg") + nebRecord.SetTimestamp(time.Now()) + + // Extract values for each column + for colIdx := 0; colIdx < int(record.NumCols()); colIdx++ { + field := schema.Field(colIdx) + col := record.Column(colIdx) + + value := dfr.extractValue(col, rowIdx) + if value != nil { + nebRecord.SetData(field.Name, value) + } + } + + records = append(records, nebRecord) + } + + return records, nil +} + +// convertArrowTableToRecords converts an Arrow table to Nebula records (legacy, not used) +func (dfr *DataFileReader) convertArrowTableToRecords(table arrow.Table) ([]*pool.Record, error) { + numRows := int(table.NumRows()) + records := make([]*pool.Record, 0, numRows) + + schema := table.Schema() + columns := make([]arrow.Array, table.NumCols()) + + // Get all columns + for i := int64(0); i < table.NumCols(); i++ { + col := table.Column(int(i)) + if col.Len() > 0 { + // Concatenate all chunks into a single array + chunks := make([]arrow.Array, col.Len()) + for j := 0; j < col.Len(); j++ { + chunks[j] = col.Data().Chunk(j) + } + concatenated, err := array.Concatenate(chunks, nil) + if err != nil { + return nil, fmt.Errorf("failed to concatenate arrays: %w", err) + } + columns[i] = concatenated + defer columns[i].Release() + } + } + + // Convert each row to a record + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + record := models.NewRecordFromPool("iceberg") + record.SetTimestamp(time.Now()) + + // Extract values for each column + for colIdx, field := range schema.Fields() { + if columns[colIdx] == nil { + continue + } + + value := dfr.extractValue(columns[colIdx], rowIdx) + if value != nil { + record.SetData(field.Name, value) + } + } + + records = append(records, record) + } + + return records, nil +} + +// extractValue extracts a value from an Arrow array at a specific index +func (dfr *DataFileReader) extractValue(arr arrow.Array, index int) interface{} { + if arr.IsNull(index) { + return nil + } + + switch a := arr.(type) { + case *array.Boolean: + return a.Value(index) + case *array.Int32: + return a.Value(index) + case *array.Int64: + return a.Value(index) + case *array.Float32: + return a.Value(index) + case *array.Float64: + return a.Value(index) + case *array.String: + return a.Value(index) + case *array.Binary: + return a.Value(index) + case *array.Date32: + // Convert Date32 to time.Time + days := a.Value(index) + return time.Unix(int64(days)*86400, 0).UTC() + case *array.Timestamp: + // Convert timestamp to time.Time + ts := a.Value(index) + tsType := a.DataType().(*arrow.TimestampType) + switch tsType.Unit { + case arrow.Second: + return time.Unix(int64(ts), 0).UTC() + case arrow.Millisecond: + return time.Unix(0, int64(ts)*1e6).UTC() + case arrow.Microsecond: + return time.Unix(0, int64(ts)*1e3).UTC() + case arrow.Nanosecond: + return time.Unix(0, int64(ts)).UTC() + } + case *array.List: + // Convert list to slice + start, end := a.ValueOffsets(index) + valueArr := a.ListValues() + values := make([]interface{}, end-start) + for i := start; i < end; i++ { + values[i-start] = dfr.extractValue(valueArr, int(i)) + } + return values + case *array.Struct: + // Convert struct to map + structType := a.DataType().(*arrow.StructType) + result := make(map[string]interface{}) + for i, field := range structType.Fields() { + fieldArr := a.Field(i) + result[field.Name] = dfr.extractValue(fieldArr, index) + } + return result + default: + dfr.logger.Warn("Unsupported Arrow type", + zap.String("type", fmt.Sprintf("%T", arr))) + return nil + } + + return nil +} diff --git a/pkg/connector/sources/iceberg/iceberg_source.go b/pkg/connector/sources/iceberg/iceberg_source.go new file mode 100644 index 0000000..a608468 --- /dev/null +++ b/pkg/connector/sources/iceberg/iceberg_source.go @@ -0,0 +1,554 @@ +package iceberg + +import ( + "context" + "fmt" + "sync" + + "github.com/ajitpratap0/nebula/pkg/config" + "github.com/ajitpratap0/nebula/pkg/connector/core" + "github.com/ajitpratap0/nebula/pkg/pool" + iceberg "github.com/shubham-tomar/iceberg-go" + "github.com/shubham-tomar/iceberg-go/table" + "go.uber.org/zap" +) + +// IcebergSource implements the core.Source interface for reading from Iceberg tables +type IcebergSource struct { + // Configuration + catalogProvider CatalogProvider + catalogType string + catalogURI string + catalogName string + warehouse string + database string + tableName string + branch string // For Nessie + + // Storage configuration (S3/MinIO) + region string + s3Endpoint string + accessKey string + secretKey string + properties map[string]string + + // Iceberg table metadata + table *table.Table + schema *core.Schema + icebergSchema *iceberg.Schema + + // Snapshot management + snapshotManager *SnapshotManager + currentSnapshot *table.Snapshot + + // Data reading + manifestReader *ManifestReader + dataFileReader *DataFileReader + + // State tracking + position *IcebergPosition + recordsRead int64 + bytesRead int64 + filesRead int64 + isInitialized bool + + // Synchronization + mu sync.RWMutex + + // Configuration + config *config.BaseConfig + readBatchSize int + + // Logger + logger *zap.Logger +} + +// NewIcebergSource creates a new Iceberg source connector +func NewIcebergSource(config *config.BaseConfig) (core.Source, error) { + if config == nil { + return nil, fmt.Errorf("configuration cannot be nil") + } + + logger, _ := zap.NewProduction() + + return &IcebergSource{ + config: config, + logger: logger, + properties: make(map[string]string), + position: &IcebergPosition{ + Metadata: make(map[string]interface{}), + }, + }, nil +} + +// Initialize initializes the Iceberg source connector +func (s *IcebergSource) Initialize(ctx context.Context, config *config.BaseConfig) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.config = config + + // Parse Iceberg-specific configuration + if err := s.parseConfig(config); err != nil { + return fmt.Errorf("failed to parse Iceberg config: %w", err) + } + + // Create catalog provider + catalogProvider, err := s.createCatalogProvider() + if err != nil { + return fmt.Errorf("failed to create catalog provider: %w", err) + } + s.catalogProvider = catalogProvider + + // Connect to catalog + catalogConfig := CatalogConfig{ + Name: s.catalogName, + URI: s.catalogURI, + WarehouseLocation: s.warehouse, + Branch: s.branch, + Region: s.region, + S3Endpoint: s.s3Endpoint, + AccessKey: s.accessKey, + SecretKey: s.secretKey, + Properties: s.properties, + } + + if err := s.catalogProvider.Connect(ctx, catalogConfig); err != nil { + return fmt.Errorf("failed to connect to catalog: %w", err) + } + + // Load table metadata + if err := s.loadTable(ctx); err != nil { + return fmt.Errorf("failed to load table: %w", err) + } + + // Initialize snapshot manager + s.snapshotManager = NewSnapshotManager(s.table, s.logger) + + // Get current snapshot + snapshot, err := s.snapshotManager.GetCurrentSnapshot() + if err != nil { + return fmt.Errorf("failed to get current snapshot: %w", err) + } + s.currentSnapshot = snapshot + + // Discover schema + if err := s.discoverSchema(ctx); err != nil { + return fmt.Errorf("failed to discover schema: %w", err) + } + + // Initialize manifest reader + s.manifestReader = NewManifestReader(s.currentSnapshot, s.table, s.logger) + + // Initialize data file reader with the table (table has IO configured from catalog) + s.dataFileReader = NewDataFileReader(s.table, s.readBatchSize, s.logger) + + s.isInitialized = true + s.logger.Info("Iceberg source initialized", + zap.String("table", fmt.Sprintf("%s.%s", s.database, s.tableName)), + zap.Int64("snapshot_id", s.currentSnapshot.SnapshotID), + zap.String("catalog_type", s.catalogType)) + + return nil +} + +// parseConfig parses Iceberg-specific configuration from BaseConfig +func (s *IcebergSource) parseConfig(config *config.BaseConfig) error { + creds := config.Security.Credentials + if creds == nil { + return fmt.Errorf("missing security credentials") + } + + // Required fields + requiredFields := map[string]*string{ + "catalog_type": &s.catalogType, + "catalog_uri": &s.catalogURI, + "catalog_name": &s.catalogName, + "warehouse": &s.warehouse, + "database": &s.database, + "table": &s.tableName, + } + + for field, target := range requiredFields { + if value, ok := creds[field]; ok && value != "" { + *target = value + } else { + return fmt.Errorf("missing required field: %s", field) + } + } + + // Optional fields + if branch, ok := creds["branch"]; ok { + s.branch = branch + } else { + s.branch = "main" // default branch for Nessie + } + + // S3 configuration - support both direct and prop_ prefixed fields + s.region = getCredValue(creds, "region", "prop_s3.region") + s.s3Endpoint = getCredValue(creds, "s3_endpoint", "prop_s3.endpoint") + s.accessKey = getCredValue(creds, "access_key", "prop_s3.access-key-id") + s.secretKey = getCredValue(creds, "secret_key", "prop_s3.secret-access-key") + + // Collect all properties with prop_ prefix + for key, value := range creds { + if len(key) > 5 && key[:5] == "prop_" { + // Remove "prop_" prefix and add to properties + propKey := key[5:] + s.properties[propKey] = value + } + } + + // Performance configuration + s.readBatchSize = config.Performance.BatchSize + if s.readBatchSize <= 0 { + s.readBatchSize = 10000 // default batch size + } + + return nil +} + +// getCredValue gets a credential value, trying multiple keys +func getCredValue(creds map[string]string, keys ...string) string { + for _, key := range keys { + if value, ok := creds[key]; ok && value != "" { + return value + } + } + return "" +} + +// createCatalogProvider creates the appropriate catalog provider based on type +func (s *IcebergSource) createCatalogProvider() (CatalogProvider, error) { + switch s.catalogType { + case "nessie": + return NewNessieCatalog(s.logger), nil + default: + return nil, fmt.Errorf("unsupported catalog type: %s", s.catalogType) + } +} + +// loadTable loads the Iceberg table from the catalog +func (s *IcebergSource) loadTable(ctx context.Context) error { + table, err := s.catalogProvider.LoadTable(ctx, s.database, s.tableName) + if err != nil { + return fmt.Errorf("failed to load table: %w", err) + } + s.table = table + return nil +} + +// discoverSchema discovers the schema from the Iceberg table +func (s *IcebergSource) discoverSchema(ctx context.Context) error { + // Get Iceberg schema from table + s.icebergSchema = s.table.Schema() + + // Get schema from catalog provider + schema, err := s.catalogProvider.GetSchema(ctx, s.database, s.tableName) + if err != nil { + return fmt.Errorf("failed to get schema: %w", err) + } + + s.schema = schema + s.logger.Info("Discovered Iceberg table schema", + zap.String("table", s.tableName), + zap.Int("field_count", len(schema.Fields))) + + return nil +} + +// Discover returns the discovered schema +func (s *IcebergSource) Discover(ctx context.Context) (*core.Schema, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.schema == nil { + return nil, fmt.Errorf("schema not discovered yet") + } + + return s.schema, nil +} + +// Read streams individual records from the Iceberg table +func (s *IcebergSource) Read(ctx context.Context) (*core.RecordStream, error) { + s.mu.RLock() + if !s.isInitialized { + s.mu.RUnlock() + return nil, fmt.Errorf("source not initialized") + } + s.mu.RUnlock() + + recordChan := pool.GetRecordChannel(s.config.Performance.BufferSize) + errorChan := make(chan error, 1) + + go func() { + defer close(recordChan) + defer close(errorChan) + defer pool.PutRecordChannel(recordChan) + + if err := s.streamRecords(ctx, recordChan, errorChan); err != nil { + errorChan <- err + } + }() + + return &core.RecordStream{ + Records: recordChan, + Errors: errorChan, + }, nil +} + +// ReadBatch reads records in batches from the Iceberg table +func (s *IcebergSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error) { + s.mu.RLock() + if !s.isInitialized { + s.mu.RUnlock() + return nil, fmt.Errorf("source not initialized") + } + s.mu.RUnlock() + + batchChan := pool.GetBatchChannel() + errorChan := make(chan error, 1) + + // Use configured batch size if not specified + if batchSize <= 0 { + batchSize = s.readBatchSize + } + + go func() { + defer close(batchChan) + defer close(errorChan) + defer pool.PutBatchChannel(batchChan) + + if err := s.streamBatches(ctx, batchSize, batchChan, errorChan); err != nil { + errorChan <- err + } + }() + + return &core.BatchStream{ + Batches: batchChan, + Errors: errorChan, + }, nil +} + +// streamRecords streams individual records using table scan +func (s *IcebergSource) streamRecords(ctx context.Context, recordChan chan<- *pool.Record, errorChan chan<- error) error { + s.logger.Info("Starting to read all records using table scan") + + // Read all records using table scan (more efficient than manual file iteration) + allRecords, err := s.dataFileReader.ReadAllRecords(ctx) + if err != nil { + return fmt.Errorf("failed to read records: %w", err) + } + + s.logger.Info("Read records from table scan", + zap.Int("total_records", len(allRecords))) + + // Send records to channel + for idx, record := range allRecords { + select { + case recordChan <- record: + s.mu.Lock() + s.recordsRead++ + s.position.RowOffset = int64(idx) + s.mu.Unlock() + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +// streamBatches streams batches of records using table scan +func (s *IcebergSource) streamBatches(ctx context.Context, batchSize int, batchChan chan<- []*pool.Record, errorChan chan<- error) error { + s.logger.Info("Starting to read records in batches using table scan", + zap.Int("batch_size", batchSize)) + + // Read all records using table scan + allRecords, err := s.dataFileReader.ReadAllRecords(ctx) + if err != nil { + return fmt.Errorf("failed to read records: %w", err) + } + + s.logger.Info("Read records from table scan", + zap.Int("total_records", len(allRecords))) + + // Send records in batches + batch := pool.GetBatchSlice(batchSize) + defer pool.PutBatchSlice(batch) + + for idx, record := range allRecords { + batch = append(batch, record) + + s.mu.Lock() + s.recordsRead++ + s.position.RowOffset = int64(idx) + s.mu.Unlock() + + // Send batch when full + if len(batch) >= batchSize { + select { + case batchChan <- batch: + batch = pool.GetBatchSlice(batchSize) + case <-ctx.Done(): + return ctx.Err() + } + } + } + + // Send remaining batch + if len(batch) > 0 { + select { + case batchChan <- batch: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +// Close closes the Iceberg source connector +func (s *IcebergSource) Close(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.catalogProvider != nil { + if err := s.catalogProvider.Close(ctx); err != nil { + return fmt.Errorf("failed to close catalog: %w", err) + } + } + + s.isInitialized = false + s.logger.Info("Iceberg source closed") + + return nil +} + +// GetPosition returns the current read position +func (s *IcebergSource) GetPosition() core.Position { + s.mu.RLock() + defer s.mu.RUnlock() + + return &IcebergPosition{ + SnapshotID: s.currentSnapshot.SnapshotID, + ManifestIndex: s.manifestReader.currentIndex, + DataFileIndex: s.position.DataFileIndex, + RowOffset: s.position.RowOffset, + Metadata: map[string]interface{}{ + "records_read": s.recordsRead, + "files_read": s.filesRead, + }, + } +} + +// SetPosition sets the read position for incremental sync +func (s *IcebergSource) SetPosition(position core.Position) error { + icebergPos, ok := position.(*IcebergPosition) + if !ok { + return fmt.Errorf("invalid position type") + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.position = icebergPos + return nil +} + +// GetState returns the full connector state +func (s *IcebergSource) GetState() core.State { + s.mu.RLock() + defer s.mu.RUnlock() + + return core.State{ + "snapshot_id": s.currentSnapshot.SnapshotID, + "manifest_index": s.manifestReader.currentIndex, + "data_file_index": s.position.DataFileIndex, + "row_offset": s.position.RowOffset, + "records_read": s.recordsRead, + "files_read": s.filesRead, + "bytes_read": s.bytesRead, + } +} + +// SetState restores connector state from a previous run +func (s *IcebergSource) SetState(state core.State) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Restore state fields if present + if snapshotID, ok := state["snapshot_id"].(int64); ok { + s.position.SnapshotID = snapshotID + } + + if manifestIdx, ok := state["manifest_index"].(int); ok { + s.manifestReader.currentIndex = manifestIdx + } + + if dataFileIdx, ok := state["data_file_index"].(int); ok { + s.position.DataFileIndex = dataFileIdx + } + + if rowOffset, ok := state["row_offset"].(int64); ok { + s.position.RowOffset = rowOffset + } + + if recordsRead, ok := state["records_read"].(int64); ok { + s.recordsRead = recordsRead + } + + if filesRead, ok := state["files_read"].(int64); ok { + s.filesRead = filesRead + } + + if bytesRead, ok := state["bytes_read"].(int64); ok { + s.bytesRead = bytesRead + } + + return nil +} + +// SupportsIncremental indicates if incremental sync is available +func (s *IcebergSource) SupportsIncremental() bool { + return true // Iceberg supports incremental reads via snapshots +} + +// SupportsRealtime indicates if real-time streaming is available +func (s *IcebergSource) SupportsRealtime() bool { + return false // Not real-time, but supports incremental +} + +// SupportsBatch indicates if batch reading is available +func (s *IcebergSource) SupportsBatch() bool { + return true +} + +// Subscribe starts CDC streaming for specified tables +func (s *IcebergSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error) { + return nil, fmt.Errorf("Iceberg source does not support real-time subscriptions") +} + +// Health checks if the source is operational +func (s *IcebergSource) Health(ctx context.Context) error { + if s.catalogProvider == nil { + return fmt.Errorf("catalog provider not initialized") + } + + return s.catalogProvider.Health(ctx) +} + +// Metrics returns performance and operational metrics +func (s *IcebergSource) Metrics() map[string]interface{} { + s.mu.RLock() + defer s.mu.RUnlock() + + return map[string]interface{}{ + "records_read": s.recordsRead, + "bytes_read": s.bytesRead, + "files_read": s.filesRead, + "current_snapshot_id": s.currentSnapshot.SnapshotID, + "table": fmt.Sprintf("%s.%s", s.database, s.tableName), + "catalog_type": s.catalogType, + } +} diff --git a/pkg/connector/sources/iceberg/init.go b/pkg/connector/sources/iceberg/init.go new file mode 100644 index 0000000..f070506 --- /dev/null +++ b/pkg/connector/sources/iceberg/init.go @@ -0,0 +1,85 @@ +package iceberg + +import ( + "github.com/ajitpratap0/nebula/pkg/connector/registry" +) + +func init() { + // Register the Iceberg source connector + _ = registry.RegisterSource("iceberg", NewIcebergSource) + + // Register connector information + _ = registry.RegisterConnectorInfo(®istry.ConnectorInfo{ + Name: "iceberg", + Type: "source", + Description: "Apache Iceberg table source connector with Nessie catalog support", + Version: "1.0.0", + Author: "Nebula Team", + Capabilities: []string{ + "streaming", + "batch", + "incremental", + "schema_discovery", + "snapshot_isolation", + "partition_pruning", + }, + ConfigSchema: map[string]interface{}{ + "catalog_type": map[string]interface{}{ + "type": "string", + "required": true, + "description": "Type of Iceberg catalog (nessie, rest)", + }, + "catalog_uri": map[string]interface{}{ + "type": "string", + "required": true, + "description": "URI of the catalog service", + }, + "catalog_name": map[string]interface{}{ + "type": "string", + "required": true, + "description": "Name of the catalog", + }, + "warehouse": map[string]interface{}{ + "type": "string", + "required": true, + "description": "Warehouse location (e.g., s3://warehouse/)", + }, + "database": map[string]interface{}{ + "type": "string", + "required": true, + "description": "Database/namespace name", + }, + "table": map[string]interface{}{ + "type": "string", + "required": true, + "description": "Table name", + }, + "branch": map[string]interface{}{ + "type": "string", + "required": false, + "default": "main", + "description": "Branch name (for Nessie catalog)", + }, + "region": map[string]interface{}{ + "type": "string", + "required": false, + "description": "AWS region for S3", + }, + "s3_endpoint": map[string]interface{}{ + "type": "string", + "required": false, + "description": "S3/MinIO endpoint URL", + }, + "access_key": map[string]interface{}{ + "type": "string", + "required": false, + "description": "S3 access key", + }, + "secret_key": map[string]interface{}{ + "type": "string", + "required": false, + "description": "S3 secret key", + }, + }, + }) +} diff --git a/pkg/connector/sources/iceberg/manifest_reader.go b/pkg/connector/sources/iceberg/manifest_reader.go new file mode 100644 index 0000000..a20b3cc --- /dev/null +++ b/pkg/connector/sources/iceberg/manifest_reader.go @@ -0,0 +1,81 @@ +package iceberg + +import ( + "context" + "fmt" + + iceberg "github.com/shubham-tomar/iceberg-go" + "github.com/shubham-tomar/iceberg-go/table" + "go.uber.org/zap" +) + +// ManifestReader reads manifest files from an Iceberg snapshot +type ManifestReader struct { + snapshot *table.Snapshot + table *table.Table + currentIndex int + logger *zap.Logger +} + +// NewManifestReader creates a new manifest reader +func NewManifestReader(snapshot *table.Snapshot, tbl *table.Table, logger *zap.Logger) *ManifestReader { + return &ManifestReader{ + snapshot: snapshot, + table: tbl, + currentIndex: 0, + logger: logger, + } +} + +// GetDataFiles returns all data files from the snapshot's manifests +func (mr *ManifestReader) GetDataFiles(ctx context.Context) ([]iceberg.DataFile, error) { + if mr.snapshot == nil { + return nil, fmt.Errorf("snapshot is nil") + } + + mr.logger.Info("Reading data files from snapshot", + zap.Int64("snapshot_id", mr.snapshot.SnapshotID), + zap.String("manifest_list", mr.snapshot.ManifestList)) + + // Get manifests from snapshot using table's IO + manifests, err := mr.snapshot.Manifests(mr.table.FS()) + if err != nil { + return nil, fmt.Errorf("failed to get manifests: %w", err) + } + + mr.logger.Info("Found manifests", + zap.Int("manifest_count", len(manifests))) + + // Collect all data files from manifests + var allDataFiles []iceberg.DataFile + + for i, manifest := range manifests { + mr.currentIndex = i + + // Get entries from manifest + entries, err := manifest.FetchEntries(mr.table.FS(), false) + if err != nil { + mr.logger.Error("Failed to fetch manifest entries", + zap.Int("manifest_index", i), + zap.Error(err)) + continue + } + + mr.logger.Debug("Processing manifest", + zap.Int("manifest_index", i), + zap.Int("entry_count", len(entries))) + + // Add data files from entries + for _, entry := range entries { + // Only include added and existing files (not deleted) + if entry.Status() != iceberg.EntryStatusDELETED { + allDataFiles = append(allDataFiles, entry.DataFile()) + } + } + } + + mr.logger.Info("Collected data files from manifests", + zap.Int("data_file_count", len(allDataFiles))) + + return allDataFiles, nil +} diff --git a/pkg/connector/sources/iceberg/nessie_catalog.go b/pkg/connector/sources/iceberg/nessie_catalog.go new file mode 100644 index 0000000..8272c91 --- /dev/null +++ b/pkg/connector/sources/iceberg/nessie_catalog.go @@ -0,0 +1,230 @@ +package iceberg + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/ajitpratap0/nebula/pkg/connector/core" + iceberg "github.com/shubham-tomar/iceberg-go" + "github.com/shubham-tomar/iceberg-go/catalog" + "github.com/shubham-tomar/iceberg-go/catalog/rest" + "github.com/shubham-tomar/iceberg-go/table" + "go.uber.org/zap" +) + +// NessieCatalog implements CatalogProvider for Nessie catalog +type NessieCatalog struct { + catalog catalog.Catalog + config CatalogConfig + logger *zap.Logger +} + +// NewNessieCatalog creates a new Nessie catalog provider +func NewNessieCatalog(logger *zap.Logger) *NessieCatalog { + return &NessieCatalog{ + logger: logger, + } +} + +// Connect establishes connection to the Nessie catalog +func (n *NessieCatalog) Connect(ctx context.Context, config CatalogConfig) error { + n.config = config + n.logger.Debug("Connecting to Nessie catalog", zap.String("uri", config.URI)) + + // Ensure URI has proper scheme + if !strings.HasPrefix(config.URI, "http://") && !strings.HasPrefix(config.URI, "https://") { + config.URI = "http://" + config.URI + } + + // Build catalog URI for Nessie + baseURI := strings.TrimSuffix(config.URI, "/api/v1") + catalogURI, err := url.JoinPath(baseURI, "iceberg", config.Branch) + if err != nil { + return fmt.Errorf("failed to build catalog URI: %w", err) + } + + n.logger.Debug("Loading Nessie Iceberg catalog", + zap.String("catalog_uri", catalogURI), + zap.String("catalog_name", config.Name), + zap.String("warehouse", config.WarehouseLocation)) + + // Build properties for catalog + props := iceberg.Properties{ + "uri": catalogURI, + } + + // Add S3 configuration + if config.Region != "" { + props["s3.region"] = config.Region + } + if config.S3Endpoint != "" { + props["s3.endpoint"] = config.S3Endpoint + } + if config.AccessKey != "" { + props["s3.access-key-id"] = config.AccessKey + } + if config.SecretKey != "" { + props["s3.secret-access-key"] = config.SecretKey + } + + // Enable path-style access for MinIO + if config.Properties != nil { + props["s3.path-style-access"] = "true" + } + + // Add custom properties + for key, value := range config.Properties { + props[key] = value + } + + n.logger.Debug("Attempting Nessie catalog.Load with properties", + zap.String("uri", catalogURI), + zap.String("catalog_name", config.Name), + zap.Any("properties", props)) + + // Load catalog + iceCatalog, err := catalog.Load(ctx, config.Name, props) + if err != nil { + n.logger.Error("Nessie catalog.Load failed", + zap.String("uri", catalogURI), + zap.String("catalog_name", config.Name), + zap.Error(err)) + return fmt.Errorf("failed to load Nessie catalog: %w", err) + } + + // Type assert to REST catalog (Nessie uses REST catalog interface) + restCatalog, ok := iceCatalog.(*rest.Catalog) + if !ok { + return fmt.Errorf("expected *rest.Catalog for Nessie, got %T", iceCatalog) + } + + // Store catalog reference + n.catalog = restCatalog + + n.logger.Info("Nessie catalog loaded successfully", + zap.String("catalog_uri", catalogURI), + zap.String("catalog_name", config.Name)) + + return nil +} + +// LoadTable loads an Iceberg table from the catalog +func (n *NessieCatalog) LoadTable(ctx context.Context, database, tableName string) (*table.Table, error) { + if n.catalog == nil { + return nil, fmt.Errorf("catalog not initialized") + } + + identifier := catalog.ToIdentifier(fmt.Sprintf("%s.%s", database, tableName)) + tbl, err := n.catalog.LoadTable(ctx, identifier, nil) + if err != nil { + return nil, fmt.Errorf("failed to load table: %w", err) + } + + n.logger.Debug("Successfully loaded table via catalog", + zap.String("table", tableName), + zap.String("location", tbl.Location()), + zap.String("identifier", fmt.Sprintf("%s.%s", database, tableName))) + + return tbl, nil +} + +// GetSchema retrieves the schema from the catalog +func (n *NessieCatalog) GetSchema(ctx context.Context, database, table string) (*core.Schema, error) { + if n.catalog == nil { + return nil, fmt.Errorf("catalog not initialized") + } + + identifier := catalog.ToIdentifier(fmt.Sprintf("%s.%s", database, table)) + tbl, err := n.catalog.LoadTable(ctx, identifier, nil) + if err != nil { + return nil, fmt.Errorf("failed to load table: %w", err) + } + + n.logger.Debug("Successfully loaded table via catalog", + zap.String("table", table), + zap.String("location", tbl.Location()), + zap.String("identifier", fmt.Sprintf("%s.%s", database, table))) + + // Get Iceberg schema + iceSchema := tbl.Schema() + schemaFields := iceSchema.Fields() + fields := make([]core.Field, 0, len(schemaFields)) + + n.logger.Debug("Table schema details", + zap.String("table", table), + zap.Int("schema_id", iceSchema.ID), + zap.Int("field_count", len(schemaFields))) + + // Convert Iceberg fields to core fields + for _, field := range schemaFields { + coreField := convertIcebergFieldToCore(field) + fields = append(fields, coreField) + + n.logger.Debug("Schema field details", + zap.String("table", table), + zap.Int("field_id", field.ID), + zap.String("field_name", field.Name), + zap.String("field_type", field.Type.String()), + zap.Bool("required", field.Required), + zap.String("doc", field.Doc)) + } + + return &core.Schema{ + Name: table, + Fields: fields, + }, nil +} + +// convertIcebergFieldToCore converts an Iceberg field to core field +func convertIcebergFieldToCore(field iceberg.NestedField) core.Field { + var fieldType core.FieldType + + typeStr := field.Type.String() + + // Map Iceberg types to core types + switch { + case typeStr == "string": + fieldType = core.FieldTypeString + case typeStr == "int" || typeStr == "long": + fieldType = core.FieldTypeInt + case typeStr == "float" || typeStr == "double": + fieldType = core.FieldTypeFloat + case typeStr == "boolean": + fieldType = core.FieldTypeBool + case strings.HasPrefix(typeStr, "timestamp"): + fieldType = core.FieldTypeTimestamp + case typeStr == "date": + fieldType = core.FieldTypeDate + case typeStr == "time": + fieldType = core.FieldTypeTime + default: + fieldType = core.FieldTypeString // fallback + } + + return core.Field{ + Name: field.Name, + Type: fieldType, + Nullable: !field.Required, + } +} + +// Close closes the catalog connection +func (n *NessieCatalog) Close(ctx context.Context) error { + // Nessie catalog doesn't require explicit close + return nil +} + +// Health checks the health of the catalog +func (n *NessieCatalog) Health(ctx context.Context) error { + if n.catalog == nil { + return fmt.Errorf("catalog not initialized") + } + return nil +} + +// Type returns the catalog type +func (n *NessieCatalog) Type() string { + return "nessie" +} diff --git a/pkg/connector/sources/iceberg/snapshot_manager.go b/pkg/connector/sources/iceberg/snapshot_manager.go new file mode 100644 index 0000000..c90a62f --- /dev/null +++ b/pkg/connector/sources/iceberg/snapshot_manager.go @@ -0,0 +1,58 @@ +package iceberg + +import ( + "fmt" + + "github.com/shubham-tomar/iceberg-go/table" + "go.uber.org/zap" +) + +// SnapshotManager manages Iceberg table snapshots for reading +type SnapshotManager struct { + table *table.Table + currentSnapshot *table.Snapshot + logger *zap.Logger +} + +// NewSnapshotManager creates a new snapshot manager +func NewSnapshotManager(tbl *table.Table, logger *zap.Logger) *SnapshotManager { + return &SnapshotManager{ + table: tbl, + logger: logger, + } +} + +// GetCurrentSnapshot returns the current snapshot of the table +func (sm *SnapshotManager) GetCurrentSnapshot() (*table.Snapshot, error) { + snapshot := sm.table.CurrentSnapshot() + if snapshot == nil { + return nil, fmt.Errorf("no current snapshot found for table") + } + + sm.currentSnapshot = snapshot + + parentID := int64(0) + if snapshot.ParentSnapshotID != nil { + parentID = *snapshot.ParentSnapshotID + } + + sm.logger.Info("Got current snapshot", + zap.Int64("snapshot_id", snapshot.SnapshotID), + zap.Int64("parent_id", parentID), + zap.Int64("sequence_number", snapshot.SequenceNumber)) + + return snapshot, nil +} + +// GetSnapshotByID returns a specific snapshot by ID +func (sm *SnapshotManager) GetSnapshotByID(snapshotID int64) (*table.Snapshot, error) { + snapshot := sm.table.SnapshotByID(snapshotID) + if snapshot == nil { + return nil, fmt.Errorf("snapshot not found: %d", snapshotID) + } + + sm.logger.Info("Found snapshot", + zap.Int64("snapshot_id", snapshotID)) + + return snapshot, nil +} diff --git a/pkg/connector/sources/iceberg/types.go b/pkg/connector/sources/iceberg/types.go new file mode 100644 index 0000000..ac731b2 --- /dev/null +++ b/pkg/connector/sources/iceberg/types.go @@ -0,0 +1,105 @@ +package iceberg + +import ( + "context" + "fmt" + + "github.com/ajitpratap0/nebula/pkg/connector/core" + "github.com/shubham-tomar/iceberg-go/table" +) + +// IcebergPosition represents a position in the Iceberg table for incremental reads +type IcebergPosition struct { + SnapshotID int64 `json:"snapshot_id"` + ManifestIndex int `json:"manifest_index"` + DataFileIndex int `json:"data_file_index"` + RowOffset int64 `json:"row_offset"` + Metadata map[string]interface{} `json:"metadata"` +} + +// String returns a string representation of the position +func (p *IcebergPosition) String() string { + return fmt.Sprintf("iceberg_snapshot_%d_manifest_%d_file_%d_offset_%d", + p.SnapshotID, p.ManifestIndex, p.DataFileIndex, p.RowOffset) +} + +// Compare compares this position with another position +func (p *IcebergPosition) Compare(other core.Position) int { + otherPos, ok := other.(*IcebergPosition) + if !ok { + return -1 + } + + if p.SnapshotID != otherPos.SnapshotID { + if p.SnapshotID < otherPos.SnapshotID { + return -1 + } + return 1 + } + + if p.ManifestIndex != otherPos.ManifestIndex { + if p.ManifestIndex < otherPos.ManifestIndex { + return -1 + } + return 1 + } + + if p.DataFileIndex != otherPos.DataFileIndex { + if p.DataFileIndex < otherPos.DataFileIndex { + return -1 + } + return 1 + } + + if p.RowOffset != otherPos.RowOffset { + if p.RowOffset < otherPos.RowOffset { + return -1 + } + return 1 + } + + return 0 +} + +// CatalogProvider defines the interface for Iceberg catalog operations +type CatalogProvider interface { + // Connect establishes connection to the catalog + Connect(ctx context.Context, config CatalogConfig) error + + // LoadTable loads an Iceberg table + LoadTable(ctx context.Context, database, tableName string) (*table.Table, error) + + // GetSchema retrieves the schema from the catalog + GetSchema(ctx context.Context, database, tableName string) (*core.Schema, error) + + // Close closes the catalog connection + Close(ctx context.Context) error + + // Health checks the health of the catalog + Health(ctx context.Context) error + + // Type returns the catalog type + Type() string +} + +// CatalogConfig contains configuration for catalog connection +type CatalogConfig struct { + Name string + URI string + WarehouseLocation string + Branch string + Region string + S3Endpoint string + AccessKey string + SecretKey string + Properties map[string]string +} + +// S3Config contains configuration for S3/MinIO access +type S3Config struct { + Region string + Endpoint string + AccessKey string + SecretKey string + Properties map[string]string +} From 5d0c908f507a9d979a1049ad67ece5b30c95457d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 10 Nov 2025 08:44:51 +0000 Subject: [PATCH 2/2] build(deps): bump golang from 1.24.3-alpine to 1.25.4-alpine Bumps golang from 1.24.3-alpine to 1.25.4-alpine. --- updated-dependencies: - dependency-name: golang dependency-version: 1.25.4-alpine dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- Dockerfile | 2 +- Dockerfile.dev | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0c3f31b..0dd95d9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build stage -FROM golang:1.24.3-alpine AS builder +FROM golang:1.25.4-alpine AS builder # Install build dependencies RUN apk add --no-cache git make diff --git a/Dockerfile.dev b/Dockerfile.dev index 0dcfc64..975cf51 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -1,7 +1,7 @@ # Development Dockerfile for Nebula # This version includes development tools and optimizations for local development -FROM golang:1.24.3-alpine AS base +FROM golang:1.25.4-alpine AS base # Install development dependencies RUN apk add --no-cache \