Skip to content

Commit 282e589

Browse files
committed
Deal with data as part of StateCtx
Previously, Data was managed separately from StateCtx, which caused several issues: - No way to pass in-memory-only data (e.g. API request/response bodies). Storing them was odd, but passing into flows wasn’t possible either. - All commands operated on states except `load` and `store` for Data → inconsistent API. - Each flow in a chain had to fetch Data separately, making StateCtx useless as a runtime context. - Data can be heavy compared to State, requiring lazy loading, which was hard to fit into the old interface. This change unifies Data handling inside StateCtx and makes the design cleaner. - Data is part of StateCtx via Datas property - Data could be passed with StateCtx completly in-meory - Data could be (lazy-)loaded. If the rev of loaded Data match requested we do not load - Data could be (lazy-)stored. If the data was previously stored and has a checksum and it match the current one we do not store.
1 parent 724c6f8 commit 282e589

31 files changed

+1019
-930
lines changed

badgerdriver/driver.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,28 @@ func (d *Driver) Shutdown(_ context.Context) error {
7979

8080
func (d *Driver) GetData(cmd *flowstate.GetDataCommand) error {
8181
return d.db.View(func(txn *badger.Txn) error {
82-
return getData(txn, cmd.Data)
82+
data := cmd.StateCtx.MustData(cmd.Alias)
83+
if err := getData(txn, data); err != nil {
84+
return err
85+
}
86+
return nil
8387
})
84-
}
8588

86-
func (d *Driver) StoreData(cmd *flowstate.AttachDataCommand) error {
87-
nextRev, err := d.dataRevSeq.Next()
88-
if err != nil {
89-
return fmt.Errorf("get next sequence: %w", err)
90-
}
91-
92-
data := cmd.Data
93-
data.Rev = int64(nextRev)
89+
}
9490

91+
func (d *Driver) StoreData(cmd *flowstate.StoreDataCommand) error {
9592
return d.db.Update(func(txn *badger.Txn) error {
96-
return setData(txn, data)
93+
data := cmd.StateCtx.MustData(cmd.Alias)
94+
nextRev, err := d.dataRevSeq.Next()
95+
if err != nil {
96+
return fmt.Errorf("get next sequence: %w", err)
97+
}
98+
99+
data.Rev = int64(nextRev)
100+
if err := setData(txn, data); err != nil {
101+
return err
102+
}
103+
return nil
97104
})
98105
}
99106

badgerdriver/op.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package badgerdriver
22

33
import (
44
"encoding/binary"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"time"
@@ -130,46 +131,62 @@ func committedAtIndexPrefix() []byte {
130131
return []byte("flowstate.index.committed_at.")
131132
}
132133

133-
func dataBKey(data *flowstate.Data) []byte {
134-
return []byte(fmt.Sprintf(`flowstate.data.blob.%020d.%s`, data.Rev, data.ID))
134+
func dataBlobKey(data *flowstate.Data) []byte {
135+
return []byte(fmt.Sprintf(`flowstate.data.blob.%020d`, data.Rev))
135136
}
136137

137-
func dataBinaryKey(data *flowstate.Data) []byte {
138-
return []byte(fmt.Sprintf(`flowstate.data.binary.%020d.%s`, data.Rev, data.ID))
138+
func dataAnnotationsKey(data *flowstate.Data) []byte {
139+
return []byte(fmt.Sprintf(`flowstate.data.annotations.%020d`, data.Rev))
139140
}
140141

141142
func setData(txn *badger.Txn, data *flowstate.Data) error {
142-
if err := txn.Set(dataBKey(data), data.B); err != nil {
143-
return fmt.Errorf("set data.B: %w", err)
143+
if err := txn.Set(dataBlobKey(data), data.Blob); err != nil {
144+
return fmt.Errorf("set data.Blob: %w", err)
144145
}
145146

146-
var dataBinary []byte
147-
if data.Binary {
148-
dataBinary = append(dataBinary, 1)
149-
}
150-
151-
if err := txn.Set(dataBinaryKey(data), dataBinary); err != nil {
152-
return fmt.Errorf("set data.Binary: %w", err)
147+
if len(data.Annotations) > 0 {
148+
annotationsJSON, err := json.Marshal(data.Annotations)
149+
if err != nil {
150+
return fmt.Errorf("marshal data.Annotations: %w", err)
151+
}
152+
if err := txn.Set(dataAnnotationsKey(data), annotationsJSON); err != nil {
153+
return fmt.Errorf("set data.Annotations: %w", err)
154+
}
153155
}
154156

155157
return nil
156158
}
157159

158160
func getData(txn *badger.Txn, data *flowstate.Data) error {
159-
item, err := txn.Get(dataBKey(data))
161+
blobItem, err := txn.Get(dataBlobKey(data))
160162
if err != nil {
161-
return fmt.Errorf("get data.Bd: %w", err)
163+
return fmt.Errorf("get data.Blob: %w", err)
162164
}
163-
data.B, err = item.ValueCopy(data.B)
165+
data.Blob, err = blobItem.ValueCopy(data.Blob)
164166
if err != nil {
165-
return fmt.Errorf("copy data.B: %w", err)
167+
return fmt.Errorf("copy data.Blob: %w", err)
166168
}
167169

168-
dataBinary, err := txn.Get(dataBinaryKey(data))
169-
if err != nil {
170-
return fmt.Errorf("get data.Binary: %w", err)
170+
if annotationsItem, err := txn.Get(dataAnnotationsKey(data)); errors.Is(err, badger.ErrKeyNotFound) {
171+
// ok
172+
} else if err != nil {
173+
return fmt.Errorf("get data.Annotations: %w", err)
174+
} else {
175+
if err := annotationsItem.Value(func(val []byte) error {
176+
if len(val) == 0 {
177+
return nil
178+
}
179+
annotations := make(map[string]string)
180+
if err := json.Unmarshal(val, &annotations); err != nil {
181+
return fmt.Errorf("unmarshal annotations: %w", err)
182+
}
183+
184+
data.Annotations = annotations
185+
return nil
186+
}); err != nil {
187+
return fmt.Errorf("copy data.Annotations: %w", err)
188+
}
171189
}
172-
data.Binary = dataBinary.ValueSize() > 0
173190

174191
return nil
175192
}

cmd.go

Lines changed: 38 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@ package flowstate
33
import (
44
"encoding/base64"
55
"fmt"
6-
"strconv"
7-
"strings"
86
"time"
9-
10-
"github.com/oklog/ulid/v2"
117
)
128

139
var _ Command = &TransitCommand{}
@@ -30,10 +26,6 @@ var _ Command = &GetStatesCommand{}
3026

3127
var _ Command = &GetDelayedStatesCommand{}
3228

33-
var _ Command = &AttachDataCommand{}
34-
35-
var _ Command = &GetDataCommand{}
36-
3729
var _ Command = &CommitCommand{}
3830

3931
var _ Command = &ExecuteCommand{}
@@ -394,60 +386,47 @@ func (cmd *UnstackCommand) Do() error {
394386
return nil
395387
}
396388

397-
func AttachData(stateCtx *StateCtx, data *Data, alias string) *AttachDataCommand {
398-
return &AttachDataCommand{
389+
func StoreData(stateCtx *StateCtx, alias string) *StoreDataCommand {
390+
return &StoreDataCommand{
399391
StateCtx: stateCtx,
400-
Data: data,
401392
Alias: alias,
402-
403-
Store: true,
404393
}
405394
}
406395

407-
type AttachDataCommand struct {
396+
type StoreDataCommand struct {
408397
command
409398
StateCtx *StateCtx
410-
Data *Data
411399
Alias string
412-
Store bool
413-
}
414-
415-
func (cmd *AttachDataCommand) WithoutStore() *AttachDataCommand {
416-
cmd.Store = false
417-
return cmd
418400
}
419401

420-
func (cmd *AttachDataCommand) Prepare() error {
421-
if cmd.Alias == "" {
422-
return fmt.Errorf("alias is empty")
423-
}
424-
if cmd.Data.ID == "" {
425-
cmd.Data.ID = DataID(ulid.Make().String())
426-
}
427-
if cmd.Data.Rev < 0 {
428-
return fmt.Errorf("Data.Rev is negative")
402+
func (cmd *StoreDataCommand) Prepare() (bool, error) {
403+
d, err := cmd.StateCtx.Data(cmd.Alias)
404+
if err != nil {
405+
return false, err
429406
}
430-
if cmd.Data.B == nil || len(cmd.Data.B) == 0 {
431-
return fmt.Errorf("Data.B is empty")
407+
408+
if d.Rev < 0 {
409+
return false, fmt.Errorf("Data.Rev is negative")
432410
}
433-
if cmd.Data.Rev == 0 && !cmd.Store {
434-
return fmt.Errorf("Data.Rev is zero, but Store is false; this would lead to data loss")
411+
412+
if !d.isDirty() {
413+
referenceData(cmd.StateCtx, cmd.Alias, d.Rev)
414+
return false, nil
435415
}
436416

437-
return nil
417+
d.checksum()
418+
419+
return true, nil
438420
}
439421

440-
func (cmd *AttachDataCommand) Do() {
441-
cmd.StateCtx.Current.SetAnnotation(
442-
dataAnnotation(cmd.Alias),
443-
string(cmd.Data.ID)+":"+strconv.FormatInt(cmd.Data.Rev, 10),
444-
)
422+
func (cmd *StoreDataCommand) post() {
423+
d := cmd.StateCtx.MustData(cmd.Alias)
424+
referenceData(cmd.StateCtx, cmd.Alias, d.Rev)
445425
}
446426

447-
func GetData(stateCtx *StateCtx, data *Data, alias string) *GetDataCommand {
427+
func GetData(stateCtx *StateCtx, alias string) *GetDataCommand {
448428
return &GetDataCommand{
449429
StateCtx: stateCtx,
450-
Data: data,
451430
Alias: alias,
452431
}
453432

@@ -456,43 +435,32 @@ func GetData(stateCtx *StateCtx, data *Data, alias string) *GetDataCommand {
456435
type GetDataCommand struct {
457436
command
458437
StateCtx *StateCtx
459-
Data *Data
460438
Alias string
461439
}
462440

463-
func (cmd *GetDataCommand) Prepare() error {
464-
if cmd.Data == nil {
465-
return fmt.Errorf("data is nil")
466-
}
467-
if cmd.Alias == "" {
468-
return fmt.Errorf("alias is empty")
469-
}
470-
471-
annotKey := dataAnnotation(cmd.Alias)
472-
idRevStr := cmd.StateCtx.Current.Annotations[annotKey]
473-
if idRevStr == "" {
474-
return fmt.Errorf("annotation %q is not set", annotKey)
475-
}
476-
477-
sepIdx := strings.LastIndexAny(idRevStr, ":")
478-
if sepIdx < 1 || sepIdx+1 == len(idRevStr) {
479-
return fmt.Errorf("annotation %q contains invalid data reference; got %q", annotKey, idRevStr)
441+
func (cmd *GetDataCommand) Prepare() (bool, error) {
442+
rev, err := dereferenceData(cmd.StateCtx, cmd.Alias)
443+
if err != nil {
444+
return false, err
480445
}
481446

482-
id := DataID(idRevStr[:sepIdx])
483-
rev, err := strconv.ParseInt(idRevStr[sepIdx+1:], 10, 64)
447+
d, err := cmd.StateCtx.Data(cmd.Alias)
484448
if err != nil {
485-
return fmt.Errorf("annotation %q contains invalid data revision; got %q: %w", annotKey, idRevStr[sepIdx+1:], err)
449+
cmd.StateCtx.SetData(cmd.Alias, &Data{
450+
Rev: rev,
451+
})
452+
return true, nil
453+
} else if d.Rev == rev {
454+
return false, nil
486455
}
487456

488-
cmd.Data.ID = id
489-
cmd.Data.Rev = rev
490-
491-
return nil
492-
}
457+
d.Rev = rev
458+
d.Blob = d.Blob[:0]
459+
for k := range d.Annotations {
460+
delete(d.Annotations, k)
461+
}
493462

494-
func dataAnnotation(alias string) string {
495-
return "flowstate.data." + string(alias)
463+
return true, nil
496464
}
497465

498466
func nextTransitionOrCurrent(stateCtx *StateCtx, to FlowID) Transition {

0 commit comments

Comments
 (0)