Skip to content
This repository was archived by the owner on Sep 9, 2024. It is now read-only.

Commit 9a0641b

Browse files
committed
mock: validate empty partition keys for all operations
The mock currently only validates empty partition keys for the Set operation, leading to unexpected success when empty partition keys are mistakenly used for Delete/Update/Read operations in tests. When testing against a real C* or Keyspaces instance, a `internal_service.cassandra_execution_error: Key may not be empty` error will appear, code `0x2200`. This PR copies the logic used for the Set operation over to the other operations. I kept the same error message, even though it doesn't match the C* one. For clarity, note that - * a single partition key cannot be an empty string * a composite partition key can have one or all empty string values * a single or composite clustering columns can have one or all empty string values I've added tests for these for the Map and MultiMap table types, but couldn't for the MultiMapMultiKey table type because there's a bug with the implementation there that eliminates empty key values even when they're allowed by C* and Keyspaces. Changing that would change app behaviour, so I've left that as-is, with a few added comments.
1 parent ac4547a commit 9a0641b

File tree

3 files changed

+71
-0
lines changed

3 files changed

+71
-0
lines changed

mock.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,10 @@ func (f *MockFilter) UpdateWithOptions(m map[string]interface{}, options Options
544544
return err
545545
}
546546

547+
if err := validatePartitionKeys(rowKeys); err != nil {
548+
return err
549+
}
550+
547551
for _, rowKey := range rowKeys {
548552
superColumnKeys, err := f.fieldsFromRelations(f.table.keys.ClusteringColumns)
549553
if err != nil {
@@ -583,6 +587,10 @@ func (f *MockFilter) Delete() Op {
583587
return err
584588
}
585589

590+
if err := validatePartitionKeys(rowKeys); err != nil {
591+
return err
592+
}
593+
586594
f.table.mtx.Lock()
587595
defer f.table.mtx.Unlock()
588596
for _, rowKey := range rowKeys {
@@ -651,6 +659,10 @@ func (q *MockFilter) readSomeRows() ([]map[string]interface{}, error) {
651659
return nil, err
652660
}
653661

662+
if err := validatePartitionKeys(rowKeys); err != nil {
663+
return nil, err
664+
}
665+
654666
var result []map[string]interface{}
655667
for _, rowKey := range rowKeys {
656668
row := q.table.rows[rowKey.RowKey()]
@@ -694,6 +706,31 @@ func (q *MockFilter) ReadOne(out interface{}) Op {
694706
})
695707
}
696708

709+
func validatePartitionKeys(keys []key) error {
710+
if len(keys) == 0 {
711+
return fmt.Errorf("Missing all mandatory PRIMARY KEYs")
712+
}
713+
714+
// no validation needed for composite partition keys
715+
if len(keys) > 1 {
716+
return nil
717+
}
718+
719+
// For a single partition key of type string, check that it is not
720+
// empty, this is same as this error from a real C* cluster-
721+
// InvalidRequest: Error from server: code=2200 [Invalid query]
722+
// message="Key may not be empty"
723+
for _, k := range keys[0] {
724+
value := k.Value
725+
stringVal, isString := value.(string)
726+
if isString && stringVal == "" {
727+
return fmt.Errorf("Missing mandatory PRIMARY KEY part %s", k.Key)
728+
}
729+
}
730+
731+
return nil
732+
}
733+
697734
// mockIterator takes in a slice of maps and implements a Scannable iterator
698735
// which goes row by row within the slice.
699736
type mockIterator struct {

mock_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,25 @@ func (s *MockSuite) TestEmptyPrimaryKey() {
122122
s.NoError(s.embMapTbl.Set(add).Run())
123123
s.NoError(s.addressByCountyMmTbl.Set(add).Run())
124124

125+
// SELECTs
126+
selectAdd := &address{}
127+
s.Error(s.embMapTbl.Read("", selectAdd).Run())
128+
s.Error(s.addressByCountyMmTbl.Read("", add.Id, selectAdd).Run())
129+
// empty clustering columns are ok though
130+
s.NoError(s.addressByCountyMmTbl.Read(add.County, "", selectAdd).Run())
131+
132+
// UPDATEs
133+
s.Error(s.embMapTbl.Update("", map[string]interface{}{}).Run())
134+
s.Error(s.addressByCountyMmTbl.Update("", add.Id, map[string]interface{}{}).Run())
135+
// empty clustering columns are ok though
136+
s.NoError(s.addressByCountyMmTbl.Update(add.County, "", map[string]interface{}{}).Run())
137+
138+
// DELETEs
139+
s.Error(s.embMapTbl.Delete("").Run())
140+
s.Error(s.addressByCountyMmTbl.Delete("", add.Id).Run())
141+
// empty clustering columns are ok though
142+
s.NoError(s.addressByCountyMmTbl.Delete(add.County, "").Run())
143+
125144
add = address{
126145
Id: "",
127146
TownID: "",
@@ -134,6 +153,19 @@ func (s *MockSuite) TestEmptyPrimaryKey() {
134153
// no error in writing all empty values to a table with a composite
135154
// primary key
136155
s.NoError(s.mmMkTable.Set(add).Run())
156+
157+
// can't test SELECT/UPDATE/DELETEs for mmMkTable because it doesn't support
158+
// empty composite partition keys or clustering columns, even though C*
159+
// does, see
160+
// https://github.com/monzo/gocassa/blob/ac4547a19b85b0fefbc2e004288dc6f4cbe46aaa/multimap_multikey_table.go#L84
161+
//
162+
// TODO: uncomment the following lines if support for empty composite keys
163+
// is added
164+
// partitionKeys := map[string]interface{}{"Id": "", "TownID": ""}
165+
// clusteringColumns := map[string]interface{}{"County": ""}
166+
// s.NoError(s.mmMkTable.Read(partitionKeys, clusteringColumns, selectAdd).Run())
167+
// s.NoError(s.mmMkTable.Update(partitionKeys, clusteringColumns, map[string]interface{}{"PostCode": "DEF"}).Run())
168+
// s.NoError(s.mmMkTable.Delete(partitionKeys, clusteringColumns).Run())
137169
}
138170

139171
func (s *MockSuite) TestTableRead() {

multimap_multikey_table.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,15 @@ func (mm *multimapMkT) WithOptions(o Options) MultimapMkTable {
8080
func (mm *multimapMkT) ListOfEqualRelations(fieldsToIndex, ids map[string]interface{}) []Relation {
8181
relations := make([]Relation, 0)
8282

83+
// TODO: support empty values in composite partition keys
8384
for _, field := range mm.fieldsToIndexBy {
8485
if value := fieldsToIndex[field]; value != nil && value != "" {
8586
relation := Eq(field, value)
8687
relations = append(relations, relation)
8788
}
8889
}
8990

91+
// TODO: support empty values in composite clustering keys
9092
for _, field := range mm.idField {
9193
if value := ids[field]; value != nil && value != "" {
9294
relation := Eq(field, value)

0 commit comments

Comments
 (0)