Skip to content
This repository was archived by the owner on Sep 9, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ func (f *MockFilter) UpdateWithOptions(m map[string]interface{}, options Options
return err
}

if err := validatePartitionKeys(rowKeys); err != nil {
return err
}

for _, rowKey := range rowKeys {
superColumnKeys, err := f.fieldsFromRelations(f.table.keys.ClusteringColumns)
if err != nil {
Expand Down Expand Up @@ -583,6 +587,10 @@ func (f *MockFilter) Delete() Op {
return err
}

if err := validatePartitionKeys(rowKeys); err != nil {
return err
}

f.table.mtx.Lock()
defer f.table.mtx.Unlock()
for _, rowKey := range rowKeys {
Expand Down Expand Up @@ -651,6 +659,10 @@ func (q *MockFilter) readSomeRows() ([]map[string]interface{}, error) {
return nil, err
}

if err := validatePartitionKeys(rowKeys); err != nil {
return nil, err
}

var result []map[string]interface{}
for _, rowKey := range rowKeys {
row := q.table.rows[rowKey.RowKey()]
Expand Down Expand Up @@ -694,6 +706,33 @@ func (q *MockFilter) ReadOne(out interface{}) Op {
})
}

func validatePartitionKeys(keys []key) error {
// this is not allowed for DELETE/UPDATEs, but is allowed for SELECTs, so
// we don't validate here
if len(keys) == 0 {
return nil
}

// no validation needed for composite partition keys
if len(keys) > 1 {
return nil
}

// For a single partition key of type string, check that it is not
// empty, this is same as this error from a real C* cluster-
// InvalidRequest: Error from server: code=2200 [Invalid query]
// message="Key may not be empty"
for _, k := range keys[0] {
value := k.Value
stringVal, isString := value.(string)
if isString && stringVal == "" {
return fmt.Errorf("Missing mandatory PRIMARY KEY part %s", k.Key)
}
}

return nil
}

// mockIterator takes in a slice of maps and implements a Scannable iterator
// which goes row by row within the slice.
type mockIterator struct {
Expand Down
36 changes: 36 additions & 0 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ func (s *MockSuite) TestEmptyPrimaryKey() {
s.NoError(s.embMapTbl.Set(add).Run())
s.NoError(s.addressByCountyMmTbl.Set(add).Run())

// SELECTs
selectAdd := &address{}
s.Error(s.embMapTbl.Read("", selectAdd).Run())
s.Error(s.addressByCountyMmTbl.Read("", add.Id, selectAdd).Run())
// empty clustering columns are ok though
s.NoError(s.addressByCountyMmTbl.Read(add.County, "", selectAdd).Run())

// UPDATEs
s.Error(s.embMapTbl.Update("", map[string]interface{}{}).Run())
s.Error(s.addressByCountyMmTbl.Update("", add.Id, map[string]interface{}{}).Run())
// empty clustering columns are ok though
s.NoError(s.addressByCountyMmTbl.Update(add.County, "", map[string]interface{}{}).Run())

// DELETEs
s.Error(s.embMapTbl.Delete("").Run())
s.Error(s.addressByCountyMmTbl.Delete("", add.Id).Run())
// empty clustering columns are ok though
s.NoError(s.addressByCountyMmTbl.Delete(add.County, "").Run())

add = address{
Id: "",
TownID: "",
Expand All @@ -134,6 +153,19 @@ func (s *MockSuite) TestEmptyPrimaryKey() {
// no error in writing all empty values to a table with a composite
// primary key
s.NoError(s.mmMkTable.Set(add).Run())

// can't test SELECT/UPDATE/DELETEs for mmMkTable because it doesn't support
// empty composite partition keys or clustering columns, even though C*
// does, see
// https://github.com/monzo/gocassa/blob/ac4547a19b85b0fefbc2e004288dc6f4cbe46aaa/multimap_multikey_table.go#L84
//
// TODO: uncomment the following lines if support for empty composite keys
// is added
// partitionKeys := map[string]interface{}{"Id": "", "TownID": ""}
// clusteringColumns := map[string]interface{}{"County": ""}
// s.NoError(s.mmMkTable.Read(partitionKeys, clusteringColumns, selectAdd).Run())
// s.NoError(s.mmMkTable.Update(partitionKeys, clusteringColumns, map[string]interface{}{"PostCode": "DEF"}).Run())
// s.NoError(s.mmMkTable.Delete(partitionKeys, clusteringColumns).Run())
}

func (s *MockSuite) TestTableRead() {
Expand Down Expand Up @@ -238,6 +270,10 @@ func (s *MockSuite) TestMapTableMultiRead() {
s.Len(users, 2)
s.Equal("Jane", users[0].Name)
s.Equal("Jill", users[1].Name)

var users2 []user
s.NoError(s.mapTbl.MultiRead([]interface{}{}, &users2).Run())
s.Len(users2, 0)
}

func (s *MockSuite) TestMapTableUpdate() {
Expand Down
2 changes: 2 additions & 0 deletions multimap_multikey_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ func (mm *multimapMkT) WithOptions(o Options) MultimapMkTable {
func (mm *multimapMkT) ListOfEqualRelations(fieldsToIndex, ids map[string]interface{}) []Relation {
relations := make([]Relation, 0)

// TODO: support empty string values in composite partition keys
for _, field := range mm.fieldsToIndexBy {
if value := fieldsToIndex[field]; value != nil && value != "" {
relation := Eq(field, value)
relations = append(relations, relation)
}
}

// TODO: support empty string values in clustering columns
for _, field := range mm.idField {
if value := ids[field]; value != nil && value != "" {
relation := Eq(field, value)
Expand Down