Skip to content

Commit ea58a8d

Browse files
committed
adding process implementation to handle unique keys by batches
1 parent 13f4dd7 commit ea58a8d

File tree

8 files changed

+178
-88
lines changed

8 files changed

+178
-88
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/gin-gonic/gin v1.10.0
99
github.com/google/uuid v1.3.0
1010
github.com/splitio/gincache v1.0.1
11-
github.com/splitio/go-split-commons/v6 v6.1.0
11+
github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414224340-9a5c36389db1
1212
github.com/splitio/go-toolkit/v5 v5.4.0
1313
github.com/stretchr/testify v1.9.0
1414
go.etcd.io/bbolt v1.3.6

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
9393
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
9494
github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU=
9595
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
96-
github.com/splitio/go-split-commons/v6 v6.1.0 h1:k3mwr12DF6gbEaV8XXU/tSAQlPkIEuzIgTEneYhGg2I=
97-
github.com/splitio/go-split-commons/v6 v6.1.0/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
96+
github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414224340-9a5c36389db1 h1:le2tNqtYsipMP5PQgJWAtu3x5DbPzquYT36RUeiiVfY=
97+
github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414224340-9a5c36389db1/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
9898
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
9999
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
100100
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

splitio/producer/conf/sections.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type AdvancedSync struct {
7070
EventsPostConcurrency int `json:"eventsPostConcurrency" s-cli:"events-post-concurrency" s-def:"0" s-desc:"#concurrent imp post threads"`
7171
EventsPostSize int `json:"eventsPostSize" s-cli:"events-post-size" s-def:"0" s-desc:"Max #impressions to send per POST"`
7272
EventsAccumWaitMs int64 `json:"eventsAccumWaitMs" s-cli:"events-accum-wait-ms" s-def:"0" s-desc:"Max ms to wait to close an events bulk"`
73-
UniqueKeysFetchSize int64 `json:"uniqueKeysFetchSize" s-cli:"unique-keys-fetch-size" s-def:"0" s-desc:"How many unique keys to pop from storage at once"`
73+
UniqueKeysFetchSize int64 `json:"uniqueKeysFetchSize" s-cli:"unique-keys-fetch-size" s-def:"1000" s-desc:"How many unique keys to pop from storage at once"`
7474
UniqueKeysProcessConcurrency int `json:"uniqueKeysProcessConcurrency" s-cli:"unique-keys-process-concurrency" s-def:"0" s-desc:"#Threads for processing uniques"`
7575
UniqueKeysProcessBatchSize int `json:"uniqueKeysProcessBatchSize" s-cli:"unique-keys-process-batch-size" s-def:"0" s-desc:"Size of uniques processing batchs"`
7676
UniqueKeysPostConcurrency int `json:"uniqueKeysPostConcurrency" s-cli:"unique-keys-post-concurrency" s-def:"0" s-desc:"#concurrent uniques post threads"`

splitio/producer/initialization.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error {
132132
metadata, logger, syncTelemetryStorage),
133133
// local telemetry
134134
TelemetryRecorder: telemetry.NewTelemetrySynchronizer(syncTelemetryStorage, splitAPI.TelemetryRecorder,
135-
storages.SplitStorage, storages.SegmentStorage, logger, metadata, syncTelemetryStorage),
135+
storages.SplitStorage, storages.SegmentStorage, logger, metadata, syncTelemetryStorage, nil),
136136
}
137137
splitTasks := synchronizer.SplitTasks{
138138
SplitSyncTask: tasks.NewFetchSplitsTask(workers.SplitUpdater, int(cfg.Sync.SplitRefreshRateMs)/1000, logger),
@@ -216,15 +216,14 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error {
216216
}
217217

218218
filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability)
219-
uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter)
219+
220220
uniquesWorker := task.NewUniqueKeysWorker(&task.UniqueWorkerConfig{
221-
Logger: logger,
222-
Storage: storages.UniqueKeysStorage,
223-
UniqueKeysTracker: uniqueKeysTracker,
224-
URL: advanced.TelemetryServiceURL,
225-
Apikey: cfg.Apikey,
226-
FetchSize: int(cfg.Sync.Advanced.UniqueKeysFetchSize),
227-
Metadata: metadata,
221+
Logger: logger,
222+
Storage: storages.UniqueKeysStorage,
223+
URL: advanced.TelemetryServiceURL,
224+
Apikey: cfg.Apikey,
225+
FetchSize: int(cfg.Sync.Advanced.UniqueKeysFetchSize),
226+
Metadata: metadata,
228227
})
229228

230229
uniquesTask, err := task.NewPipelinedTask(&task.Config{

splitio/producer/task/uniquekeys.go

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,44 @@ import (
77
"net/http"
88

99
"github.com/splitio/go-split-commons/v6/dtos"
10-
"github.com/splitio/go-split-commons/v6/provisional/strategy"
1110
"github.com/splitio/go-split-commons/v6/storage"
11+
"github.com/splitio/go-toolkit/v5/datastructures/set"
1212
"github.com/splitio/go-toolkit/v5/logging"
1313
)
1414

1515
// UniqueWorkerConfig bundles options
1616
type UniqueWorkerConfig struct {
17-
Logger logging.LoggerInterface
18-
Storage storage.UniqueKeysMultiSdkConsumer
19-
UniqueKeysTracker strategy.UniqueKeysTracker
20-
URL string
21-
Apikey string
22-
FetchSize int
23-
Metadata dtos.Metadata
17+
Logger logging.LoggerInterface
18+
Storage storage.UniqueKeysMultiSdkConsumer
19+
URL string
20+
Apikey string
21+
FetchSize int
22+
Metadata dtos.Metadata
2423
}
2524

2625
// UniqueKeysPipelineWorker implements all the required methods to work with a pipelined task
2726
type UniqueKeysPipelineWorker struct {
28-
logger logging.LoggerInterface
29-
storage storage.UniqueKeysMultiSdkConsumer
30-
uniqueKeysTracker strategy.UniqueKeysTracker
31-
27+
logger logging.LoggerInterface
28+
storage storage.UniqueKeysMultiSdkConsumer
3229
url string
3330
apikey string
34-
fetchSize int64
31+
fetchSize int
3532
metadata dtos.Metadata
3633
}
3734

3835
func NewUniqueKeysWorker(cfg *UniqueWorkerConfig) Worker {
3936
return &UniqueKeysPipelineWorker{
40-
logger: cfg.Logger,
41-
storage: cfg.Storage,
42-
uniqueKeysTracker: cfg.UniqueKeysTracker,
43-
url: cfg.URL + "/keys/ss",
44-
apikey: cfg.Apikey,
45-
fetchSize: int64(cfg.FetchSize),
46-
metadata: cfg.Metadata,
37+
logger: cfg.Logger,
38+
storage: cfg.Storage,
39+
url: cfg.URL + "/keys/ss",
40+
apikey: cfg.Apikey,
41+
fetchSize: cfg.FetchSize,
42+
metadata: cfg.Metadata,
4743
}
4844
}
4945

5046
func (u *UniqueKeysPipelineWorker) Fetch() ([]string, error) {
51-
raw, _, err := u.storage.PopNRaw(u.fetchSize)
47+
raw, _, err := u.storage.PopNRaw(int64(u.fetchSize))
5248
if err != nil {
5349
return nil, fmt.Errorf("error fetching raw unique keys: %w", err)
5450
}
@@ -57,31 +53,30 @@ func (u *UniqueKeysPipelineWorker) Fetch() ([]string, error) {
5753
}
5854

5955
func (u *UniqueKeysPipelineWorker) Process(raws [][]byte, sink chan<- interface{}) error {
56+
rawKeys := make([]dtos.Key, 0)
6057
for _, raw := range raws {
61-
err, value := parseToObj(raw)
58+
value, err := parseToObj(raw)
6259
if err == nil {
6360
u.logger.Debug("Unique Keys parsed to Dto.")
6461
}
6562

6663
if err != nil {
67-
err, value = parseToArray(raw)
64+
value, err = parseToArray(raw)
6865
if err != nil {
6966
u.logger.Error("error deserializing fetched uniqueKeys: ", err.Error())
7067
continue
7168
}
7269
u.logger.Debug("Unique Keys parsed to Array.")
7370
}
7471

75-
for _, unique := range value {
76-
for _, key := range unique.Keys {
77-
u.uniqueKeysTracker.Track(unique.Feature, key)
78-
}
79-
}
72+
rawKeys = append(rawKeys, value...)
8073
}
8174

82-
uniques := u.uniqueKeysTracker.PopAll()
83-
if len(uniques.Keys) > 0 {
84-
sink <- uniques
75+
filtered := cleanUp(rawKeys)
76+
groups := batches(filtered, u.fetchSize)
77+
78+
for index := range groups {
79+
sink <- groups[index]
8580
}
8681

8782
return nil
@@ -108,22 +103,78 @@ func (u *UniqueKeysPipelineWorker) BuildRequest(data interface{}) (*http.Request
108103
return req, nil
109104
}
110105

111-
func parseToArray(raw []byte) (error, []dtos.Key) {
106+
func parseToArray(raw []byte) ([]dtos.Key, error) {
112107
var queueObj []dtos.Key
113108
err := json.Unmarshal(raw, &queueObj)
114109
if err != nil {
115-
return err, nil
110+
return nil, err
116111
}
117112

118-
return nil, queueObj
113+
return queueObj, nil
119114
}
120115

121-
func parseToObj(raw []byte) (error, []dtos.Key) {
116+
func parseToObj(raw []byte) ([]dtos.Key, error) {
122117
var queueObj dtos.Key
123118
err := json.Unmarshal(raw, &queueObj)
124119
if err != nil {
125-
return err, nil
120+
return nil, err
121+
}
122+
123+
return []dtos.Key{queueObj}, nil
124+
}
125+
126+
func cleanUp(keys []dtos.Key) map[string]*set.ThreadUnsafeSet {
127+
filtered := make(map[string]*set.ThreadUnsafeSet)
128+
for _, key := range keys {
129+
_, exists := filtered[key.Feature]
130+
if !exists {
131+
filtered[key.Feature] = set.NewSet()
132+
}
133+
filtered[key.Feature].Add(key.Keys...)
134+
}
135+
136+
return filtered
137+
}
138+
139+
func batches(filtered map[string]*set.ThreadUnsafeSet, maxSize int) []dtos.Uniques {
140+
groups := make([]dtos.Uniques, 0)
141+
currentBatch := dtos.Uniques{Keys: []dtos.Key{}}
142+
currentBatchSize := 0
143+
144+
for name, keys := range filtered {
145+
keyList := keys.List()
146+
totalKeys := len(keyList)
147+
start := 0
148+
149+
for start < totalKeys {
150+
end := start + maxSize - currentBatchSize
151+
if end > totalKeys {
152+
end = totalKeys
153+
}
154+
155+
// Add keys to the current batch
156+
keyDto := dtos.Key{
157+
Feature: name,
158+
Keys: keyList[start:end],
159+
}
160+
currentBatch.Keys = append(currentBatch.Keys, keyDto)
161+
currentBatchSize += len(keyList[start:end])
162+
163+
// If the current batch reaches maxSize, finalize it and start a new one
164+
if currentBatchSize >= maxSize {
165+
groups = append(groups, currentBatch)
166+
currentBatch = dtos.Uniques{Keys: []dtos.Key{}}
167+
currentBatchSize = 0
168+
}
169+
170+
start = end
171+
}
172+
}
173+
174+
// Add the remaining batch if it has any keys
175+
if currentBatchSize > 0 {
176+
groups = append(groups, currentBatch)
126177
}
127178

128-
return nil, []dtos.Key{queueObj}
179+
return groups
129180
}

0 commit comments

Comments
 (0)