Skip to content

Commit 4916d95

Browse files
authored
Merge pull request #97 from makasim/state-ctx-datas
Deal with data as part of StateCtx
2 parents 724c6f8 + a5bc178 commit 4916d95

37 files changed

+1151
-1073
lines changed

badgerdriver/driver.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,28 @@ func (d *Driver) Shutdown(_ context.Context) error {
7979

8080
func (d *Driver) GetData(cmd *flowstate.GetDataCommand) error {
8181
return d.db.View(func(txn *badger.Txn) error {
82-
return getData(txn, cmd.Data)
82+
data := cmd.StateCtx.MustData(cmd.Alias)
83+
if err := getData(txn, data); err != nil {
84+
return err
85+
}
86+
return nil
8387
})
84-
}
8588

86-
func (d *Driver) StoreData(cmd *flowstate.AttachDataCommand) error {
87-
nextRev, err := d.dataRevSeq.Next()
88-
if err != nil {
89-
return fmt.Errorf("get next sequence: %w", err)
90-
}
91-
92-
data := cmd.Data
93-
data.Rev = int64(nextRev)
89+
}
9490

91+
func (d *Driver) StoreData(cmd *flowstate.StoreDataCommand) error {
9592
return d.db.Update(func(txn *badger.Txn) error {
96-
return setData(txn, data)
93+
data := cmd.StateCtx.MustData(cmd.Alias)
94+
nextRev, err := d.dataRevSeq.Next()
95+
if err != nil {
96+
return fmt.Errorf("get next sequence: %w", err)
97+
}
98+
99+
data.Rev = int64(nextRev)
100+
if err := setData(txn, data); err != nil {
101+
return err
102+
}
103+
return nil
97104
})
98105
}
99106

badgerdriver/op.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package badgerdriver
22

33
import (
44
"encoding/binary"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"time"
@@ -130,46 +131,62 @@ func committedAtIndexPrefix() []byte {
130131
return []byte("flowstate.index.committed_at.")
131132
}
132133

133-
func dataBKey(data *flowstate.Data) []byte {
134-
return []byte(fmt.Sprintf(`flowstate.data.blob.%020d.%s`, data.Rev, data.ID))
134+
func dataBlobKey(data *flowstate.Data) []byte {
135+
return []byte(fmt.Sprintf(`flowstate.data.blob.%020d`, data.Rev))
135136
}
136137

137-
func dataBinaryKey(data *flowstate.Data) []byte {
138-
return []byte(fmt.Sprintf(`flowstate.data.binary.%020d.%s`, data.Rev, data.ID))
138+
func dataAnnotationsKey(data *flowstate.Data) []byte {
139+
return []byte(fmt.Sprintf(`flowstate.data.annotations.%020d`, data.Rev))
139140
}
140141

141142
func setData(txn *badger.Txn, data *flowstate.Data) error {
142-
if err := txn.Set(dataBKey(data), data.B); err != nil {
143-
return fmt.Errorf("set data.B: %w", err)
143+
if err := txn.Set(dataBlobKey(data), data.Blob); err != nil {
144+
return fmt.Errorf("set data.Blob: %w", err)
144145
}
145146

146-
var dataBinary []byte
147-
if data.Binary {
148-
dataBinary = append(dataBinary, 1)
149-
}
150-
151-
if err := txn.Set(dataBinaryKey(data), dataBinary); err != nil {
152-
return fmt.Errorf("set data.Binary: %w", err)
147+
if len(data.Annotations) > 0 {
148+
annotationsJSON, err := json.Marshal(data.Annotations)
149+
if err != nil {
150+
return fmt.Errorf("marshal data.Annotations: %w", err)
151+
}
152+
if err := txn.Set(dataAnnotationsKey(data), annotationsJSON); err != nil {
153+
return fmt.Errorf("set data.Annotations: %w", err)
154+
}
153155
}
154156

155157
return nil
156158
}
157159

158160
func getData(txn *badger.Txn, data *flowstate.Data) error {
159-
item, err := txn.Get(dataBKey(data))
161+
blobItem, err := txn.Get(dataBlobKey(data))
160162
if err != nil {
161-
return fmt.Errorf("get data.Bd: %w", err)
163+
return fmt.Errorf("get data.Blob: %w", err)
162164
}
163-
data.B, err = item.ValueCopy(data.B)
165+
data.Blob, err = blobItem.ValueCopy(data.Blob)
164166
if err != nil {
165-
return fmt.Errorf("copy data.B: %w", err)
167+
return fmt.Errorf("copy data.Blob: %w", err)
166168
}
167169

168-
dataBinary, err := txn.Get(dataBinaryKey(data))
169-
if err != nil {
170-
return fmt.Errorf("get data.Binary: %w", err)
170+
if annotationsItem, err := txn.Get(dataAnnotationsKey(data)); errors.Is(err, badger.ErrKeyNotFound) {
171+
// ok
172+
} else if err != nil {
173+
return fmt.Errorf("get data.Annotations: %w", err)
174+
} else {
175+
if err := annotationsItem.Value(func(val []byte) error {
176+
if len(val) == 0 {
177+
return nil
178+
}
179+
annotations := make(map[string]string)
180+
if err := json.Unmarshal(val, &annotations); err != nil {
181+
return fmt.Errorf("unmarshal annotations: %w", err)
182+
}
183+
184+
data.Annotations = annotations
185+
return nil
186+
}); err != nil {
187+
return fmt.Errorf("copy data.Annotations: %w", err)
188+
}
171189
}
172-
data.Binary = dataBinary.ValueSize() > 0
173190

174191
return nil
175192
}

cmd.go

Lines changed: 38 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@ package flowstate
33
import (
44
"encoding/base64"
55
"fmt"
6-
"strconv"
7-
"strings"
86
"time"
9-
10-
"github.com/oklog/ulid/v2"
117
)
128

139
var _ Command = &TransitCommand{}
@@ -30,10 +26,6 @@ var _ Command = &GetStatesCommand{}
3026

3127
var _ Command = &GetDelayedStatesCommand{}
3228

33-
var _ Command = &AttachDataCommand{}
34-
35-
var _ Command = &GetDataCommand{}
36-
3729
var _ Command = &CommitCommand{}
3830

3931
var _ Command = &ExecuteCommand{}
@@ -394,60 +386,47 @@ func (cmd *UnstackCommand) Do() error {
394386
return nil
395387
}
396388

397-
func AttachData(stateCtx *StateCtx, data *Data, alias string) *AttachDataCommand {
398-
return &AttachDataCommand{
389+
func StoreData(stateCtx *StateCtx, alias string) *StoreDataCommand {
390+
return &StoreDataCommand{
399391
StateCtx: stateCtx,
400-
Data: data,
401392
Alias: alias,
402-
403-
Store: true,
404393
}
405394
}
406395

407-
type AttachDataCommand struct {
396+
type StoreDataCommand struct {
408397
command
409398
StateCtx *StateCtx
410-
Data *Data
411399
Alias string
412-
Store bool
413-
}
414-
415-
func (cmd *AttachDataCommand) WithoutStore() *AttachDataCommand {
416-
cmd.Store = false
417-
return cmd
418400
}
419401

420-
func (cmd *AttachDataCommand) Prepare() error {
421-
if cmd.Alias == "" {
422-
return fmt.Errorf("alias is empty")
423-
}
424-
if cmd.Data.ID == "" {
425-
cmd.Data.ID = DataID(ulid.Make().String())
426-
}
427-
if cmd.Data.Rev < 0 {
428-
return fmt.Errorf("Data.Rev is negative")
402+
func (cmd *StoreDataCommand) Prepare() (bool, error) {
403+
d, err := cmd.StateCtx.Data(cmd.Alias)
404+
if err != nil {
405+
return false, err
429406
}
430-
if cmd.Data.B == nil || len(cmd.Data.B) == 0 {
431-
return fmt.Errorf("Data.B is empty")
407+
408+
if d.Rev < 0 {
409+
return false, fmt.Errorf("data rev is negative")
432410
}
433-
if cmd.Data.Rev == 0 && !cmd.Store {
434-
return fmt.Errorf("Data.Rev is zero, but Store is false; this would lead to data loss")
411+
412+
if !d.isDirty() {
413+
referenceData(cmd.StateCtx, cmd.Alias, d.Rev)
414+
return false, nil
435415
}
436416

437-
return nil
417+
d.checksum()
418+
419+
return true, nil
438420
}
439421

440-
func (cmd *AttachDataCommand) Do() {
441-
cmd.StateCtx.Current.SetAnnotation(
442-
dataAnnotation(cmd.Alias),
443-
string(cmd.Data.ID)+":"+strconv.FormatInt(cmd.Data.Rev, 10),
444-
)
422+
func (cmd *StoreDataCommand) post() {
423+
d := cmd.StateCtx.MustData(cmd.Alias)
424+
referenceData(cmd.StateCtx, cmd.Alias, d.Rev)
445425
}
446426

447-
func GetData(stateCtx *StateCtx, data *Data, alias string) *GetDataCommand {
427+
func GetData(stateCtx *StateCtx, alias string) *GetDataCommand {
448428
return &GetDataCommand{
449429
StateCtx: stateCtx,
450-
Data: data,
451430
Alias: alias,
452431
}
453432

@@ -456,43 +435,32 @@ func GetData(stateCtx *StateCtx, data *Data, alias string) *GetDataCommand {
456435
type GetDataCommand struct {
457436
command
458437
StateCtx *StateCtx
459-
Data *Data
460438
Alias string
461439
}
462440

463-
func (cmd *GetDataCommand) Prepare() error {
464-
if cmd.Data == nil {
465-
return fmt.Errorf("data is nil")
466-
}
467-
if cmd.Alias == "" {
468-
return fmt.Errorf("alias is empty")
469-
}
470-
471-
annotKey := dataAnnotation(cmd.Alias)
472-
idRevStr := cmd.StateCtx.Current.Annotations[annotKey]
473-
if idRevStr == "" {
474-
return fmt.Errorf("annotation %q is not set", annotKey)
475-
}
476-
477-
sepIdx := strings.LastIndexAny(idRevStr, ":")
478-
if sepIdx < 1 || sepIdx+1 == len(idRevStr) {
479-
return fmt.Errorf("annotation %q contains invalid data reference; got %q", annotKey, idRevStr)
441+
func (cmd *GetDataCommand) Prepare() (bool, error) {
442+
rev, err := dereferenceData(cmd.StateCtx, cmd.Alias)
443+
if err != nil {
444+
return false, err
480445
}
481446

482-
id := DataID(idRevStr[:sepIdx])
483-
rev, err := strconv.ParseInt(idRevStr[sepIdx+1:], 10, 64)
447+
d, err := cmd.StateCtx.Data(cmd.Alias)
484448
if err != nil {
485-
return fmt.Errorf("annotation %q contains invalid data revision; got %q: %w", annotKey, idRevStr[sepIdx+1:], err)
449+
cmd.StateCtx.SetData(cmd.Alias, &Data{
450+
Rev: rev,
451+
})
452+
return true, nil
453+
} else if d.Rev == rev {
454+
return false, nil
486455
}
487456

488-
cmd.Data.ID = id
489-
cmd.Data.Rev = rev
490-
491-
return nil
492-
}
457+
d.Rev = rev
458+
d.Blob = d.Blob[:0]
459+
for k := range d.Annotations {
460+
delete(d.Annotations, k)
461+
}
493462

494-
func dataAnnotation(alias string) string {
495-
return "flowstate.data." + string(alias)
463+
return true, nil
496464
}
497465

498466
func nextTransitionOrCurrent(stateCtx *StateCtx, to FlowID) Transition {

0 commit comments

Comments
 (0)