diff --git a/go.mod b/go.mod index 169ce29c1..ced8de82e 100644 --- a/go.mod +++ b/go.mod @@ -29,9 +29,13 @@ require ( github.com/authzed/authzed-go v1.4.1 github.com/authzed/consistent v0.1.0 github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b - github.com/aws/aws-sdk-go-v2 v1.36.4 + github.com/aws/aws-sdk-go v1.55.8 + github.com/aws/aws-sdk-go-v2 v1.37.1 github.com/aws/aws-sdk-go-v2/config v1.29.16 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.1 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.8.1 github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.5.12 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.45.1 github.com/benbjohnson/clock v1.3.5 github.com/bits-and-blooms/bloom/v3 v3.7.0 github.com/caio/go-tdigest/v4 v4.0.1 @@ -118,7 +122,11 @@ require ( sigs.k8s.io/controller-runtime v0.21.0 ) -require golang.org/x/vuln v1.1.4 // indirect +require ( + github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.27.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.1 // indirect + golang.org/x/vuln v1.1.4 // indirect +) // Most tools are managed in the magefiles module. These tools are just // the ones that can't run from a submodule at the moment. @@ -176,15 +184,15 @@ require ( github.com/authzed/ctxkey v0.0.0-20250226155515-d49f99185584 github.com/aws/aws-sdk-go-v2/credentials v1.17.69 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.31 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.16 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.25.4 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.2 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.33.21 // indirect - github.com/aws/smithy-go v1.22.2 // indirect + github.com/aws/smithy-go v1.22.5 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect diff --git a/go.sum b/go.sum index 280a9ed56..bcf979381 100644 --- a/go.sum +++ b/go.sum @@ -1455,24 +1455,36 @@ github.com/authzed/ctxkey v0.0.0-20250226155515-d49f99185584 h1:mP7EpcUL90EKcf/F github.com/authzed/ctxkey v0.0.0-20250226155515-d49f99185584/go.mod h1:wnimjr5RPPouIhZQ3ztDBLMUKKuUroj3U9Jy0PxeaEE= github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b h1:wbh8IK+aMLTCey9sZasO7b6BWLAJnHHvb79fvWCXwxw= github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b/go.mod h1:s3qC7V7XIbiNWERv7Lfljy/Lx25/V1Qlexb0WJuA8uQ= -github.com/aws/aws-sdk-go-v2 v1.36.4 h1:GySzjhVvx0ERP6eyfAbAuAXLtAda5TEy19E5q5W8I9E= -github.com/aws/aws-sdk-go-v2 v1.36.4/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= +github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= +github.com/aws/aws-sdk-go-v2 v1.37.1 h1:SMUxeNz3Z6nqGsXv0JuJXc8w5YMtrQMuIBmDx//bBDY= +github.com/aws/aws-sdk-go-v2 v1.37.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= github.com/aws/aws-sdk-go-v2/config v1.29.16 h1:XkruGnXX1nEZ+Nyo9v84TzsX+nj86icbFAeust6uo8A= github.com/aws/aws-sdk-go-v2/config v1.29.16/go.mod h1:uCW7PNjGwZ5cOGZ5jr8vCWrYkGIhPoTNV23Q/tpHKzg= github.com/aws/aws-sdk-go-v2/credentials v1.17.69 h1:8B8ZQboRc3uaIKjshve/XlvJ570R7BKNy3gftSbS178= github.com/aws/aws-sdk-go-v2/credentials v1.17.69/go.mod h1:gPME6I8grR1jCqBFEGthULiolzf/Sexq/Wy42ibKK9c= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.1 h1:1ToPL5M0nYwkIOTb9r+ION0ZZe9xemRe1mRMWMw5ihs= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.1/go.mod h1:dDdNpGWZdj4AxADkfM1IG1IutBmSJM7zURhUNOVv/lE= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.8.1 h1:W5pbxu4HaA56dHdCdBgDk0UccIMtYVZIZvQTXNlgOd8= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.8.1/go.mod h1:nbLpEe6EynGLqFEkvq1K2TFqxozTYC2sqIUWg0gS/ZI= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.31 h1:oQWSGexYasNpYp4epLGZxxjsDo8BMBh6iNWkTXQvkwk= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.31/go.mod h1:nc332eGUU+djP3vrMI6blS0woaCfHTe3KiSQUVTMRq0= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.5.12 h1:TGacjTPZ5GNFTltGPNH76EX9g7ZX6prlyBxc9SqHuKw= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.5.12/go.mod h1:t10ExEjdhPVHJNyzQh12+xnW8G1wb5bwnoqZ+657kp8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 h1:o1v1VFfPcDVlK3ll1L5xHsaQAFdNtZ5GXnNR7SwueC4= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35/go.mod h1:rZUQNYMNG+8uZxz9FOerQJ+FceCiodXvixpeRtdESrU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 h1:R5b82ubO2NntENm3SAm0ADME+H630HomNJdgv+yZ3xw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35/go.mod h1:FuA+nmgMRfkzVKYDNEqQadvEMxtxl9+RLT9ribCwEMs= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 h1:ksZXBYv80EFTcgc8OJO48aQ8XDWXIQL7gGasPeCoTzI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1/go.mod h1:HSksQyyJETVZS7uM54cir0IgxttTD+8aEoJMPGepHBI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 h1:+dn/xF/05utS7tUhjIcndbuaPjfll2LhbH1cCDGLYUQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1/go.mod h1:hyAGz30LHdm5KBZDI58MXx5lDVZ5CUfvfTZvMu4HCZo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.45.1 h1:gFD9BLrXox2Q5zxFwyD2OnGb40YYofQ/anaGxVP848Q= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.45.1/go.mod h1:J+qJkxNypYjDcwXldBH+ox2T7OshtP6LOq5VhU0v6hg= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.27.1 h1:H4W48E0/zjiHLlL59/Y0DpaB+krXsuarjwrquCwMtT4= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.27.1/go.mod h1:nGsqtVMMjTeFot6U+rLj+mpOcZybPoxyQPMKY4GHwQo= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.1 h1:/E4JUPMI8LRX2XpXsbmKN42l1lZPoLjGJ/Kun97pLc0= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.1/go.mod h1:qgbd/t8S8y5e87KPQ4kC0kyxZ0K6nC1QiDtFMoxlsOo= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.16 h1:/ldKrPPXTC421bTNWrUIpq3CxwHwRI/kpc+jPUTJocM= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.16/go.mod h1:5vkf/Ws0/wgIMJDQbjI4p2op86hNW6Hie5QtebrDgT8= github.com/aws/aws-sdk-go-v2/service/sso v1.25.4 h1:EU58LP8ozQDVroOEyAfcq0cGc5R/FTZjVoYJ6tvby3w= @@ -1481,8 +1493,8 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.2 h1:XB4z0hbQtpmBnb1FQYvKaCM7 github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.2/go.mod h1:hwRpqkRxnQ58J9blRDrB4IanlXCpcKmsC83EhG77upg= github.com/aws/aws-sdk-go-v2/service/sts v1.33.21 h1:nyLjs8sYJShFYj6aiyjCBI3EcLn1udWrQTjEF+SOXB0= github.com/aws/aws-sdk-go-v2/service/sts v1.33.21/go.mod h1:EhdxtZ+g84MSGrSrHzZiUm9PYiZkrADNja15wtRJSJo= -github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= -github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= +github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -2011,6 +2023,8 @@ github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjz github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jjti/go-spancheck v0.6.5 h1:lmi7pKxa37oKYIMScialXUK6hP3iY5F1gu+mLBPgYB8= github.com/jjti/go-spancheck v0.6.5/go.mod h1:aEogkeatBrbYsyW6y5TgDfihCulDYciL1B7rG2vSsrU= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= diff --git a/internal/datastore/dynamodb/caveat.go b/internal/datastore/dynamodb/caveat.go new file mode 100644 index 000000000..1f2270c3c --- /dev/null +++ b/internal/datastore/dynamodb/caveat.go @@ -0,0 +1,249 @@ +package dynamodb + +import ( + "context" + "fmt" + + sq "github.com/Masterminds/squirrel" + "github.com/authzed/spicedb/pkg/datastore" + corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbv2 "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +// WriteCaveats implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) WriteCaveats(ctx context.Context, caveats []*corev1.CaveatDefinition) error { + + wr := []types.WriteRequest{} + for _, caveat := range caveats { + serialized, err := caveat.MarshalVT() + if err != nil { + return fmt.Errorf("%d", err) + } + + kvp := KeyValues{} + + // fmt.Printf("%#t\n", caveat) + + kvp[ColCaveat] = &caveat.Name + kvp[ColCreatedXid] = aws.String(d.xid.String()) + kvp[ColDeletedXid] = aws.String("") + + wr = append(wr, types.WriteRequest{ + PutRequest: Caveat.GetPutItem(kvp, map[string]interface{}{ + ColSerialized: serialized, + }), + }) + + } + d.ds.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ + TableName: wr, + }, + }) + + return nil +} + +// ReadCaveatByName implements datastore.ReadWriteTransaction. +func (d dynamodbReader) ReadCaveatByName(ctx context.Context, name string) (caveat *corev1.CaveatDefinition, lastWritten datastore.Revision, err error) { + + key := expression.KeyEqual(expression.Key(SK), expression.Value(Caveat.SK.Build( + KeyValues{ + ColCaveat: &name, + }))) + + key = key.And(expression.KeyEqual(expression.Key(PK), expression.Value(Caveat.PK.Build( + KeyValues{ + ColEntity: &Caveat.Entity, + })))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + if err != nil { + return nil, nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + item, ok := res[0].(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("") + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, nil, fmt.Errorf("serialized attribute is not binary type") + } + + caveat = &corev1.CaveatDefinition{} + + err = caveat.UnmarshalVT(binaryAttr) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + return caveat, nil, nil +} + +// LookupCaveatsWithNames implements datastore.ReadWriteTransaction. +func (d dynamodbReader) LookupCaveatsWithNames(ctx context.Context, names []string) ([]datastore.RevisionedCaveat, error) { + if len(names) == 0 { + return nil, fmt.Errorf("names list cannot be empty") + } + + values := []expression.OperandBuilder{} + for _, name := range names { + n := Caveat.LSI1SK.Build(KeyValues{ + ColCaveat: &name, + }) + values = append(values, expression.Value(n)) + } + var filter expression.ConditionBuilder + + filter = expression.Name(LSI1SK).In(values[0], values[1:]...) + + key := expression.KeyEqual(expression.Key(PK), expression.Value(Caveat.PK.Build(KeyValues{ + ColEntity: &Caveat.Entity, + }))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + WithFilter(filter). + Build() + if err != nil { + return nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + resNs := []datastore.RevisionedCaveat{} + + for _, r := range res { + item, ok := r.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("") + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, fmt.Errorf("serialized attribute is not binary type") + } + + caveat := &corev1.CaveatDefinition{} + + err := caveat.UnmarshalVT(binaryAttr) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + resNs = append(resNs, datastore.RevisionedCaveat{ + Definition: caveat, + LastWrittenRevision: datastore.NoRevision, + }) + } + + return resNs, nil +} + +func (d dynamodbReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) { + key := expression.KeyEqual(expression.Key(PK), expression.Value(Caveat.PK.Build(KeyValues{ + ColEntity: &Caveat.Entity, + }))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + if err != nil { + return nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + resNs := []datastore.RevisionedCaveat{} + + for _, r := range res { + item, ok := r.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("") + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, fmt.Errorf("serialized attribute is not binary type") + } + + caveat := &corev1.CaveatDefinition{} + + err := caveat.UnmarshalVT(binaryAttr) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + resNs = append(resNs, datastore.RevisionedCaveat{ + Definition: caveat, + LastWrittenRevision: datastore.NoRevision, + }) + } + + return resNs, nil +} + +// DeleteCaveats implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) DeleteCaveats(ctx context.Context, names []string) error { + statements := []types.BatchStatementRequest{} + + for _, name := range names { + deleteCaveat := delete + query := deleteCaveat. + Where(sq.Eq{PK: Caveat.PK.Build(KeyValues{ColEntity: &Caveat.Entity})}). + Where(sq.Eq{SK: Caveat.SK.Build(KeyValues{ColCaveat: &name})}) + + sql, args, err := query.ToSql() + if err != nil { + return err + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + statements = append(statements, types.BatchStatementRequest{ + Statement: aws.String(sql), + Parameters: parameters, + }) + } + + res, err := d.ds.client.BatchExecuteStatement(ctx, &ddbv2.BatchExecuteStatementInput{ + Statements: statements, + }) + if err != nil { + fmt.Println(err.Error()) + return err + } + + fmt.Printf("output - %#v\n", res) + + return nil +} diff --git a/internal/datastore/dynamodb/client.go b/internal/datastore/dynamodb/client.go new file mode 100644 index 000000000..18c6b9f27 --- /dev/null +++ b/internal/datastore/dynamodb/client.go @@ -0,0 +1,251 @@ +package dynamodb + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddb "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +func (d DynamodbDatastore) CreateTable(ctx context.Context) (*types.TableDescription, error) { + var tableDesc *types.TableDescription + table, err := d.client.CreateTable(ctx, TableDefinition()) + + if err != nil { + fmt.Printf("Couldn't create table %v. Here's why: %v\n", d.tableName, err) + } else { + waiter := dynamodb.NewTableExistsWaiter(d.client) + err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ + TableName: aws.String(d.tableName)}, 5*time.Minute) + if err != nil { + fmt.Printf("Wait for table exists failed. Here's why: %v\n", err) + } + tableDesc = table.TableDescription + fmt.Printf("Ccreating table test") + } + return tableDesc, err + +} + +func (d DynamodbDatastore) TableExists(ctx context.Context) (bool, error) { + exists := true + _, err := d.client.DescribeTable( + ctx, &ddb.DescribeTableInput{TableName: aws.String(d.tableName)}, + ) + if err != nil { + var notFoundEx *types.ResourceNotFoundException + if errors.As(err, ¬FoundEx) { + fmt.Printf("Table %v does not exist.\n", d.tableName) + err = nil + } else { + fmt.Printf("Couldn't determine existence of table %v. Here's why: %v\n", d.tableName, err) + } + exists = false + } + return exists, err +} + +func (d DynamodbDatastore) PutItem(ctx context.Context, item map[string]types.AttributeValue) error { + + _, err := d.client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(d.tableName), + Item: item, + }) + if err != nil { + fmt.Printf("Couldn't add item to table. Here's why: %v\n", err) + fmt.Printf("%#v\n", item) + } + return err +} + +func (d DynamodbDatastore) DeleteItem(ctx context.Context, namespace string) error { + _, err := d.client.DeleteItem(ctx, &dynamodb.DeleteItemInput{ + TableName: aws.String(d.tableName), + }) + if err != nil { + fmt.Printf("Couldn't delete %v from the table. Here's why: %v\n", namespace, err) + } + return err +} + +func (d DynamodbDatastore) QueryItem(ctx context.Context, expr *expression.Expression, options ...QueryOption) ([]interface{}, error) { + var err error + var response *dynamodb.QueryOutput + var items []interface{} + + opt, _ := generateQueryOpt(options) + + if err != nil { + return nil, fmt.Errorf("Couldn't build expression for query. Here's why: %v\n", err) + } + + queryInputReq := &dynamodb.QueryInput{ + TableName: aws.String(d.tableName), + ExpressionAttributeNames: expr.Names(), + ExpressionAttributeValues: expr.Values(), + KeyConditionExpression: expr.KeyCondition(), + FilterExpression: expr.Filter(), + } + + if opt.indexName != "" { + queryInputReq.IndexName = aws.String(opt.indexName) + } + + if opt.consistentReads { + queryInputReq.ConsistentRead = aws.Bool(opt.consistentReads) + } + + if opt.scanIndexForward { + queryInputReq.ScanIndexForward = aws.Bool(opt.scanIndexForward) + } + + if opt.limit != 0 { + queryInputReq.Limit = aws.Int32(opt.limit) + } + + queryPaginator := dynamodb.NewQueryPaginator(d.client, queryInputReq) + for queryPaginator.HasMorePages() { + response, err = queryPaginator.NextPage(ctx) + if err != nil { + fmt.Printf("Couldn't query for items released in %v. Here's why: %v\n", "namespace", err) + break + } + + var itemPage []interface{} + err = attributevalue.UnmarshalListOfMaps(response.Items, &itemPage) + if err != nil { + fmt.Printf("Couldn't unmarshal query response. Here's why: %v\n", err) + break + } + + items = append(items, itemPage...) + } + + return items, err +} + +func TableDefinition() *ddb.CreateTableInput { + return &ddb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String(PK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(SK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(LSI1SK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(LSI2SK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(GSI1PK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(GSI1SK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(GSI2PK), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String(GSI2SK), + AttributeType: types.ScalarAttributeTypeS, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(PK), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(SK), + KeyType: types.KeyTypeRange, + }, + }, + LocalSecondaryIndexes: []types.LocalSecondaryIndex{ + { + IndexName: aws.String(IDX_LSI1), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(PK), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(LSI1SK), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + }, + { + IndexName: aws.String(IDX_LSI2), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(PK), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(LSI2SK), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + }, + }, + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(IDX_GSI1), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(GSI1PK), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(GSI1SK), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + }, + { + IndexName: aws.String(IDX_GSI2), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(GSI2PK), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(GSI2SK), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + }, + }, + TableName: aws.String(TableName), + BillingMode: types.BillingModePayPerRequest, + } +} diff --git a/internal/datastore/dynamodb/composite.go b/internal/datastore/dynamodb/composite.go new file mode 100644 index 000000000..08253d897 --- /dev/null +++ b/internal/datastore/dynamodb/composite.go @@ -0,0 +1,134 @@ +package dynamodb + +import ( + "fmt" + "maps" + "regexp" + "slices" + "strings" +) + +var ( + replaceRe = regexp.MustCompile("\\${|}") + literalRe = regexp.MustCompile("\\${\\w+?}") +) + +type CompositeFields struct { + pattern *regexp.Regexp + fields []string +} + +type KeyValues map[string]*string + +func NewCompositeFields(prefix string, fields ...string) *CompositeFields { + pattern := "" + if prefix != "" { + pattern += prefix + "#" + } + + for i, field := range fields { + pattern += fmt.Sprintf("(?<%s>.*?)", field) + if i != (len(fields) - 1) { + pattern += "#" + } + } + + pregex := regexp.MustCompile("^" + pattern + "$") + + return &CompositeFields{ + pattern: pregex, + fields: fields, + } +} + +func NewCompositeFieldsWithoutPrefix(fields ...string) *CompositeFields { + return NewCompositeFields("", fields...) +} + +func (c CompositeFields) Build(kvp KeyValues) string { + result, _ := c.BuildFull(kvp) + return result +} + +func (c CompositeFields) BuildFull(kvp KeyValues) (result string, remainingFields []string) { + result, remainingFields, required := c.BuildPartial(kvp) + + if len(required) != 0 { + return "", slices.Collect(maps.Keys(kvp)) + } + + return +} + +func (c CompositeFields) BuildPartial(kvp map[string]*string) (result string, remainingFields []string, required []string) { + remainingFields = slices.Collect(maps.Keys(kvp)) + required = slices.Clone(c.fields) + + result = c.pattern.String() + + result = strings.TrimPrefix(result, "^") + result = strings.TrimSuffix(result, "$") + + for _, field := range c.fields { + namedGroup := fmt.Sprintf("(?<%s>.*?)", field) + + if value, exists := kvp[field]; exists && value != nil { + result = strings.ReplaceAll(result, namedGroup, *value) + } else { + i := strings.Index(result, "(?<") + result = result[:i] + break + } + + iu := slices.Index(required, field) + if iu != -1 { + required = append(required[:iu], required[iu+1:]...) + } + + i := slices.Index(remainingFields, field) + if i != -1 { + remainingFields = append(remainingFields[:i], remainingFields[i+1:]...) + } + } + + return +} + +func (c CompositeFields) Check(availables []string) (unused []string, required []string) { + unused = slices.Clone(availables) + required = slices.Clone(c.fields) + + for _, field := range c.fields { + if !slices.Contains(availables, field) { + break + } + + i := slices.Index(required, field) + if i != -1 { + required = append(required[:i], required[i+1:]...) + } + + iu := slices.Index(unused, field) + if iu != -1 { + unused = append(unused[:iu], unused[iu+1:]...) + } + } + + return +} + +func (c CompositeFields) Extract(str string, kvp KeyValues) { + matches := c.pattern.FindStringSubmatch(str) + if matches == nil { + return + } + + fields := c.pattern.SubexpNames() + + for i, field := range fields { + _, exists := kvp[field] + if i > 0 && field != "" && i < len(matches) && exists { + *kvp[field] = matches[i] + } + } +} diff --git a/internal/datastore/dynamodb/composite_test.go b/internal/datastore/dynamodb/composite_test.go new file mode 100644 index 000000000..1c4cfd32f --- /dev/null +++ b/internal/datastore/dynamodb/composite_test.go @@ -0,0 +1,112 @@ +package dynamodb + +import ( + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/stretchr/testify/require" +) + +func TestNewCompositeField(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id") + require.ElementsMatch(t, comp.fields, []string{"namespace", "object_id"}) + require.Equal(t, comp.pattern.String(), "^REL#(?.*?)#(?.*?)$") +} + +func TestNewCompositeFieldBuild(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id") + ns, oid := "user", "u1" + kvp := map[string]*string{ + "namespace": &ns, + "object_id": &oid, + } + res := comp.Build(kvp) + + require.Equal(t, res, "REL#user#u1") +} + +func TestNewCompositeFieldBuildPartials(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id", "subject") + ns, sub := "user", "sub" + kvp := map[string]*string{ + "namespace": &ns, + "subject": &sub, + } + res, remaining, required := comp.BuildPartial(kvp) + + require.Equal(t, res, "REL#user#") + require.ElementsMatch(t, remaining, []string{"subject"}) + require.ElementsMatch(t, required, []string{"subject", "object_id"}) +} +func TestNewCompositeFieldBuildFull(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id", "subject") + ns, sub, oid := "user", "sub", "u1" + kvp := map[string]*string{ + "namespace": &ns, + "subject": &sub, + "object_id": &oid, + } + res, remaining := comp.BuildFull(kvp) + + require.Equal(t, res, "REL#user#u1#sub") + require.ElementsMatch(t, remaining, []string{}) +} + +func TestNewCompositeFieldBuildFullRemainingFields(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id", "subject") + ns, sub, oid := "user", "sub", "u1" + kvp := map[string]*string{ + "namespace": &ns, + "subject": &sub, + "object_id": &oid, + "relation": aws.String("writer"), + } + res, remaining := comp.BuildFull(kvp) + + require.Equal(t, res, "REL#user#u1#sub") + require.ElementsMatch(t, remaining, []string{"relation"}) +} + +func TestNewCompositeFieldBuildFullMissingOrder(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id", "subject") + ns, sub := "user", "sub" + kvp := map[string]*string{ + "namespace": &ns, + "subject": &sub, + "relation": aws.String("writer"), + } + res, remaining := comp.BuildFull(kvp) + + require.Equal(t, res, "") + require.ElementsMatch(t, remaining, []string{"relation", "namespace", "subject"}) +} + +func TestNewCompositeFieldCheck(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id", "subject") + availables := []string{ + "namespace", + "subject", + } + unused, required := comp.Check(availables) + + require.ElementsMatch(t, unused, []string{"subject"}) + require.ElementsMatch(t, required, []string{"object_id", "subject"}) +} + +func TestNewCompositeFieldExtract(t *testing.T) { + comp := NewCompositeFields("REL", "namespace", "object_id", "relation") + + var ns, rel, obj string + + kvp := map[string]*string{ + "namespace": &ns, + "relation": &rel, + "object_id": &obj, + } + + comp.Extract("REL#user#u1#writer", kvp) + + require.Equal(t, ns, "user") + require.Equal(t, obj, "u1") + require.Equal(t, rel, "writer") +} diff --git a/internal/datastore/dynamodb/dynamodb.go b/internal/datastore/dynamodb/dynamodb.go new file mode 100644 index 000000000..eddf9b64c --- /dev/null +++ b/internal/datastore/dynamodb/dynamodb.go @@ -0,0 +1,268 @@ +package dynamodb + +import ( + "context" + "fmt" + "time" + + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" + "go.opentelemetry.io/otel" + + sq "github.com/Masterminds/squirrel" + datastoreinternal "github.com/authzed/spicedb/internal/datastore" + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" + ddb "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go/aws/awserr" +) + +var ( + tracer = otel.Tracer("spicedb/internal/datastore/dynamodb") + + partiql = sq.StatementBuilder.PlaceholderFormat(sq.Question) +) + +const ( + Engine = "dynamodb" + TableName = "spicedb" + + liveDeletedTxnID = uint64(9223372036854775807) +) + +func init() { + datastore.Engines = append(datastore.Engines, Engine) +} + +func NewDynamodbDatastore(ctx context.Context, options ...Option) (datastore.Datastore, error) { + ds, err := newDynamodbDatastore(ctx, options...) + if err != nil { + return nil, err + } + return datastoreinternal.NewSeparatingContextDatastoreProxy(ds), nil +} + +func newDynamodbDatastore(ctx context.Context, options ...Option) (datastore.Datastore, error) { + config, err := generateConfig(options) + if err != nil { + fmt.Println("error") + } + + fmt.Printf("aws url %s", config.awsUrl) + + awscfg, err := awsconfig.LoadDefaultConfig( + ctx, + ) + + if config.awsRegion != "" { + awscfg.Region = config.awsRegion + } + + if config.awsUrl != "" { + awscfg.BaseEndpoint = &config.awsUrl + } + + // if config.awsScreteAccessKey != "" { + // awscfg.Credentials + // } + + // if credentialsProvider != nil { + // // awscfg.CredentialsProvider = credentialsProvider + // // awscfg.Credentials = credentialsProvider + // } + + client := ddb.NewFromConfig(awscfg) + + d := &DynamodbDatastore{ + hlc: NewHybridLogicalClock(), + client: client, + tableName: TableName, + url: config.awsUrl, + } + + exist, err := d.TableExists(ctx) + if err != nil { + fmt.Println(err.Error()) + } + if !exist { + _, err = d.CreateTable(ctx) + if err != nil { + fmt.Println(err.Error()) + } + } + + return d, nil + +} + +type DynamodbDatastore struct { + client *ddb.Client + tableName string + url string + hlc *HybridLogicalClock +} + +// CheckRevision implements datastore.Datastore. +func (d *DynamodbDatastore) CheckRevision(ctx context.Context, revision datastore.Revision) error { + panic("unimplemented") +} + +// Close implements datastore.Datastore. +func (d DynamodbDatastore) Close() error { + return nil +} + +// Features implements datastore.Datastore. +func (d DynamodbDatastore) Features(ctx context.Context) (*datastore.Features, error) { + return d.OfflineFeatures() +} + +// HeadRevision implements datastore.Datastore. +func (d DynamodbDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { + return d.hlc.Now(), nil +} + +// MetricsID implements datastore.Datastore. +func (d DynamodbDatastore) MetricsID() (string, error) { + return common.MetricsIDFromURL(d.url) +} + +// OfflineFeatures implements datastore.Datastore. +func (d DynamodbDatastore) OfflineFeatures() (*datastore.Features, error) { + return &datastore.Features{ + Watch: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, + IntegrityData: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, + ContinuousCheckpointing: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, + }, nil +} + +// OptimizedRevision implements datastore.Datastore. +func (d DynamodbDatastore) OptimizedRevision(ctx context.Context) (datastore.Revision, error) { + return d.hlc.Now(), nil +} + +// ReadWriteTx implements datastore.Datastore. +func (d DynamodbDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, options ...options.RWTOptionsOption) (datastore.Revision, error) { + xid := d.CreateNewTransaction(ctx).(revisions.HLCRevision) + + rwt := NewDynamodbReadWriterTx(d, xid) + + fn(ctx, rwt) + + return xid, nil +} + +// ReadyState implements datastore.Datastore. +func (d DynamodbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { + return datastore.ReadyState{ + IsReady: true, + }, nil +} + +// RevisionFromString implements datastore.Datastore. +func (d DynamodbDatastore) RevisionFromString(serialized string) (datastore.Revision, error) { + return revisions.NewHLCForTime(time.Now()), nil +} + +// SnapshotReader implements datastore.Datastore. +func (d DynamodbDatastore) SnapshotReader(datastore.Revision) datastore.Reader { + return NewDynamodbReader(d) +} + +// Statistics implements datastore.Datastore. +func (d DynamodbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { + return datastore.Stats{}, nil +} + +// Watch implements datastore.Datastore. +func (d DynamodbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { + panic("unimplemented") +} + +func (d DynamodbDatastore) CreateNewTransaction(ctx context.Context) datastore.Revision { + latestXid, err := d.getLatestTransaction(ctx) + if err != nil { + return d.hlc.Now() + } + + d.hlc.Update(latestXid.(revisions.HLCRevision)) + + newXid := d.hlc.Now() + + for { + kvp := KeyValues{ + ColCreatedXid: aws.String(newXid.String()), + } + + putRequest := Transaction.GetPutItem(kvp, nil) + + out, err := d.client.PutItem(ctx, &ddb.PutItemInput{ + TableName: &d.tableName, + Item: putRequest.Item, + ConditionExpression: aws.String(fmt.Sprintf("attribute_not_exists(%s)", SK)), + }) + + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ConditionalCheckFailedException" { + newXid = d.hlc.Now() + continue + } + } + + fmt.Println(err) + } + fmt.Printf("output - %#v\n", out) + break + } + + return newXid +} + +func (d DynamodbDatastore) getLatestTransaction(ctx context.Context) (datastore.Revision, error) { + key := expression.KeyEqual(expression.Key(PK), expression.Value(Transaction.PK.Build(KeyValues{}))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + if err != nil { + return nil, fmt.Errorf(err.Error()) + } + + res, err := d.QueryItem(ctx, &expr, + WithLimit(1), + WithConsistentReads(true), + WithScanIndexForward(true), + ) + + if err != nil { + return nil, fmt.Errorf(err.Error()) + } + + if len(res) == 0 { + return d.hlc.Now(), nil + } + + item, ok := res[0].(map[string]interface{}) + if !ok { + return d.hlc.Now(), nil + } + + if v, ok := item[SK].(string); ok { + return revisions.HLCRevisionFromString(v) + } + + return d.hlc.Now(), nil +} diff --git a/internal/datastore/dynamodb/entity.go b/internal/datastore/dynamodb/entity.go new file mode 100644 index 000000000..9ee950d0c --- /dev/null +++ b/internal/datastore/dynamodb/entity.go @@ -0,0 +1,349 @@ +package dynamodb + +import ( + "slices" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +type Attr struct { + Name string + Type string + Value *CompositeFields +} + +type EntityAttr struct { + PK *CompositeFields + SK *CompositeFields + LSI1SK *CompositeFields + LSI2SK *CompositeFields + GSI1PK *CompositeFields + GSI1SK *CompositeFields + GSI2PK *CompositeFields + GSI2SK *CompositeFields + Entity string + ExtraAttrs []Attr +} + +const ( + EntityNamespace = "namespace" + EntityCaveat = "caveat" + EntityRelationTuple = "relation_tuple" + EntityRelationTupleTransaction = "relation_tuple_transaction" + EntityCounter = "relationship_counter" + ColEntity = "entity" + + ColCreatedXid = "created_xid" + ColDeletedXid = "deleted_xid" + ColNamespace = "namespace" + ColCaveat = "caveat" + ColSerialized = "serialized" + ColCreatedTxn = "created_transaction" + ColDeletedTxn = "deleted_transaction" + + ColResourceType = "namespace" + ColObjectID = "object_id" + ColRelation = "relation" + ColUsersetNamespace = "userset_namespace" + ColUsersetObjectID = "userset_object_id" + ColUsersetRelation = "userset_relation" + ColCaveatContextName = "caveat_name" + ColCaveatContext = "caveat_context" + ColExpiration = "expiration" + + ColSnapshot = "snapshot" + ColMetadata = "Metadata" + ColCounterName = "name" + ColCurrentCount = "current_count" + // ColCounterFilter = "serialized_filter" + ColCounterCurrentCount = "current_count" + ColCounterSnapshot = "updated_revision_snapshot" + + PREFIX_NS = "NS" + PREFIX_CAV = "CAV" + PREFIX_REL = "REL" + PREFIX_TXN = "REL_TXN" + PREFIX_COUNTER = "COUNT" + + TYPE_BINARY = "B" + TYPE_STRING = "S" + TYPE_COMPOSITE = "C" + TYPE_NUMBER = "N" + + PK = "PK" + SK = "SK" + LSI1SK = "LSI1SK" + LSI2SK = "LSI2SK" + GSI1PK = "GSI1PK" + GSI1SK = "GSI1SK" + GSI2PK = "GSI2PK" + GSI2SK = "GSI2SK" + + IDX_LSI1 = "LSI1" + IDX_LSI2 = "LSI2" + IDX_GSI1 = "GSI1" + IDX_GSI2 = "GSI2" +) + +var ( + Namespace = EntityAttr{ + PK: NewCompositeFields(PREFIX_NS, ColEntity), + SK: NewCompositeFields(PREFIX_NS, ColNamespace), + LSI1SK: NewCompositeFields(PREFIX_NS, ColNamespace), + Entity: EntityNamespace, + ExtraAttrs: []Attr{ + { + Name: ColSerialized, + Type: TYPE_BINARY, + }, + { + Name: ColCreatedXid, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColCreatedXid), + }, + }, + } + + Caveat = EntityAttr{ + PK: NewCompositeFields(PREFIX_CAV, ColEntity), + SK: NewCompositeFields(PREFIX_CAV, ColCaveat), + LSI1SK: NewCompositeFields(PREFIX_CAV, ColCaveat), + Entity: EntityCaveat, + ExtraAttrs: []Attr{ + { + Name: ColSerialized, + Type: TYPE_BINARY, + }, + { + Name: ColCreatedTxn, + Type: TYPE_STRING, + }, + { + Name: ColDeletedTxn, + Type: TYPE_STRING, + }, + { + Name: ColCreatedXid, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColCreatedXid), + }, + }, + } + + RelationTuple = EntityAttr{ + PK: NewCompositeFields( + PREFIX_REL, + ColResourceType, + ColRelation, + ColObjectID, + ), + SK: NewCompositeFields( + PREFIX_REL, + ColUsersetNamespace, + ColUsersetObjectID, + ColUsersetRelation, + ), + LSI1SK: NewCompositeFields( + PREFIX_REL, + ColUsersetObjectID, + ColUsersetNamespace, + ColUsersetRelation, + ), + GSI1PK: NewCompositeFields( + PREFIX_REL, + ColResourceType, + ), + GSI1SK: NewCompositeFields( + PREFIX_REL, + ColRelation, + ColUsersetNamespace, + ColObjectID, + ColUsersetObjectID, + ), + GSI2PK: NewCompositeFields( + PREFIX_REL, + ColUsersetNamespace, + ), + GSI2SK: NewCompositeFields( + PREFIX_REL, + ColResourceType, + ColUsersetRelation, + ColRelation, + ColObjectID, + ), + Entity: EntityRelationTuple, + ExtraAttrs: []Attr{ + { + Name: ColCaveatContextName, + Type: TYPE_STRING, + }, + { + Name: ColCaveatContext, + Type: TYPE_STRING, + }, + { + Name: ColExpiration, + Type: TYPE_STRING, + }, + { + Name: ColResourceType, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColResourceType), + }, + + { + Name: ColRelation, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColRelation), + }, + { + Name: ColObjectID, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColObjectID), + }, + { + Name: ColUsersetNamespace, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColUsersetNamespace), + }, + { + Name: ColUsersetObjectID, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColUsersetObjectID), + }, + { + Name: ColUsersetRelation, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColUsersetRelation), + }, + { + Name: ColCreatedXid, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColCreatedXid), + }, + { + Name: EntityRelationTuple, + Type: TYPE_COMPOSITE, + Value: NewCompositeFields( + PREFIX_REL, + ColNamespace, + ColRelation, + ColObjectID, + ColUsersetNamespace, + ColUsersetRelation, + ColUsersetObjectID, + ), + }, + }, + } + + Transaction = EntityAttr{ + PK: NewCompositeFields(PREFIX_TXN), + SK: NewCompositeFieldsWithoutPrefix(ColCreatedXid), + Entity: EntityRelationTupleTransaction, + ExtraAttrs: []Attr{ + { + Name: ColMetadata, + Type: TYPE_STRING, + }, + }, + } + + Counter = EntityAttr{ + PK: NewCompositeFields(PREFIX_COUNTER), + SK: NewCompositeFieldsWithoutPrefix(ColCounterName), + Entity: EntityCounter, + ExtraAttrs: []Attr{ + { + Name: ColSerialized, + Type: TYPE_BINARY, + }, + { + Name: ColCurrentCount, + Type: TYPE_NUMBER, + }, + { + Name: ColCreatedXid, + Type: TYPE_COMPOSITE, + Value: NewCompositeFieldsWithoutPrefix(ColCreatedXid), + }, + }, + } +) + +func (ea EntityAttr) GetExtraAttrField(field string) Attr { + i := slices.IndexFunc[[]Attr, Attr](ea.ExtraAttrs, func(e Attr) bool { + return e.Name == field + }) + + if i != -1 { + return ea.ExtraAttrs[i] + } + + return Attr{} +} + +type Item map[string]types.AttributeValue + +func addValueToItem(item Item, key string, comp *CompositeFields, kvp KeyValues) { + if comp != nil && len(comp.Build(kvp)) > 0 { + (item)[key] = &types.AttributeValueMemberS{ + Value: comp.Build(kvp), + } + } +} + +func (ea EntityAttr) GetPutItem(kvp KeyValues, extraAttr map[string]interface{}) *types.PutRequest { + + kvp[ColEntity] = &ea.Entity + + item := Item{} + + addValueToItem(item, PK, ea.PK, kvp) + addValueToItem(item, SK, ea.SK, kvp) + addValueToItem(item, LSI1SK, ea.LSI1SK, kvp) + addValueToItem(item, LSI2SK, ea.LSI2SK, kvp) + addValueToItem(item, GSI1PK, ea.GSI1PK, kvp) + addValueToItem(item, GSI1SK, ea.GSI1SK, kvp) + + for _, v := range ea.ExtraAttrs { + if v.Type == TYPE_COMPOSITE { + addValueToItem(item, v.Name, v.Value, kvp) + } + + if extraAttr == nil { + continue + } + + if value, exists := extraAttr[v.Name]; exists { + switch v.Type { + case TYPE_STRING: + { + item[v.Name] = &types.AttributeValueMemberS{ + Value: value.(string), + } + } + case TYPE_NUMBER: + { + item[v.Name] = &types.AttributeValueMemberN{ + Value: value.(string), + } + } + case TYPE_BINARY: + { + item[v.Name] = &types.AttributeValueMemberB{ + Value: value.([]byte), + } + } + } + } + } + + item[ColEntity] = &types.AttributeValueMemberS{ + Value: ea.Entity, + } + + return &types.PutRequest{ + Item: item, + } +} diff --git a/internal/datastore/dynamodb/entity_test.go b/internal/datastore/dynamodb/entity_test.go new file mode 100644 index 000000000..ec0712e4e --- /dev/null +++ b/internal/datastore/dynamodb/entity_test.go @@ -0,0 +1,24 @@ +package dynamodb + +import ( + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/aws" +) + +func TestAddValueToItem(t *testing.T) { + fmt.Println("in testing") + comp := NewCompositeFields("REL", "namespace", "subject") + kvp := map[string]*string{ + "namespace": aws.String("user"), + "subject": aws.String("sub"), + } + // res, remaining := comp.BuildFull(kvp) + + item := Item{} + + addValueToItem(item, "pk", comp, kvp) + + fmt.Printf("%#v \n", item) +} diff --git a/internal/datastore/dynamodb/hlc.go b/internal/datastore/dynamodb/hlc.go new file mode 100644 index 000000000..ec452d23c --- /dev/null +++ b/internal/datastore/dynamodb/hlc.go @@ -0,0 +1,59 @@ +package dynamodb + +import ( + "sync" + "time" + + "github.com/authzed/spicedb/internal/datastore/revisions" +) + +type HybridLogicalClock struct { + mu sync.Mutex + physical int64 + logical uint32 +} + +func NewHybridLogicalClock() *HybridLogicalClock { + return &HybridLogicalClock{ + physical: time.Now().UnixNano(), + logical: 0, + } +} + +func (hlc *HybridLogicalClock) Now() revisions.HLCRevision { + hlc.mu.Lock() + defer hlc.mu.Unlock() + + currentPhysical := time.Now().UnixNano() + + if currentPhysical > hlc.physical { + hlc.physical = currentPhysical + hlc.logical = 0 + } else { + hlc.logical++ + } + + return revisions.NewHLC(hlc.physical, hlc.logical) +} + +func (hlc *HybridLogicalClock) Update(receivedTimestamp revisions.HLCRevision) revisions.HLCRevision { + hlc.mu.Lock() + defer hlc.mu.Unlock() + + currentPhysical := time.Now().UnixNano() + + maxPhysical := max(currentPhysical, receivedTimestamp.TimestampNanoSec()) + + if maxPhysical > hlc.physical { + hlc.physical = maxPhysical + hlc.logical = 0 + } else { + hlc.logical++ + } + + if hlc.physical == receivedTimestamp.TimestampNanoSec() { + hlc.logical = max(hlc.logical, receivedTimestamp.LogicalClock()+1) + } + + return revisions.NewHLC(hlc.physical, hlc.logical) +} diff --git a/internal/datastore/dynamodb/indexes.go b/internal/datastore/dynamodb/indexes.go new file mode 100644 index 000000000..8c276b831 --- /dev/null +++ b/internal/datastore/dynamodb/indexes.go @@ -0,0 +1,69 @@ +package dynamodb + +import "github.com/authzed/spicedb/pkg/datastore/queryshape" + +type DynamodbIndex struct { + pkKey string + skKey string + indexName string + queryShape []queryshape.Shape + pk *CompositeFields + sk *CompositeFields +} + +var ( + primary = &DynamodbIndex{ + PK, + SK, + "", + []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + queryshape.AllSubjectsForResources, + queryshape.FindResourceRelationForSubjectRelation, + // queryshape.FindResourceOfTypeAndRelation, + }, + RelationTuple.PK, + RelationTuple.SK, + } + lsi = &DynamodbIndex{ + PK, + LSI1SK, + IDX_LSI1, + []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.MatchingResourcesForSubject, + }, + RelationTuple.PK, + RelationTuple.LSI1SK, + } + gsi1 = &DynamodbIndex{ + GSI1PK, + GSI1SK, + IDX_GSI1, + []queryshape.Shape{ + queryshape.FindResourceOfType, + // queryshape.FindResourceOfTypeAndRelation, + }, + RelationTuple.GSI1PK, + RelationTuple.GSI1SK, + } + gsi2 = &DynamodbIndex{ + GSI2SK, + GSI2PK, + IDX_GSI2, + []queryshape.Shape{ + queryshape.FindResourceRelationForSubjectRelation, + queryshape.FindSubjectOfTypeAndRelation, + }, + RelationTuple.GSI2PK, + RelationTuple.GSI2SK, + } +) + +var Indexes = []*DynamodbIndex{ + primary, + lsi, + gsi1, + gsi2, +} diff --git a/internal/datastore/dynamodb/options.go b/internal/datastore/dynamodb/options.go new file mode 100644 index 000000000..75119c2b3 --- /dev/null +++ b/internal/datastore/dynamodb/options.go @@ -0,0 +1,111 @@ +package dynamodb + +import ( + "time" +) + +type dynamodbOption struct { + awsUrl string + awsAccessKeyId string + awsScreteAccessKey string + awsRegion string + awsSharedProfile string + gcWindow time.Duration + followerReadDelay time.Duration + revisionQuantization time.Duration + maxRevisionStalenessPercent float64 +} + +type Option func(*dynamodbOption) + +func generateConfig(options []Option) (dynamodbOption, error) { + computed := dynamodbOption{ + awsUrl: "", + awsAccessKeyId: "", + awsScreteAccessKey: "", + awsRegion: "", + awsSharedProfile: "", + } + + for _, option := range options { + option(&computed) + } + + return computed, nil +} + +func AwsUrl(awsurl string) Option { + return func(so *dynamodbOption) { + so.awsUrl = awsurl + } +} + +func AwsAccessKeyId(awsAccessKeyId string) Option { + return func(so *dynamodbOption) { + so.awsAccessKeyId = awsAccessKeyId + } +} + +func AwsScreteAccessKey(awsScreteAccessKey string) Option { + return func(so *dynamodbOption) { + so.awsScreteAccessKey = awsScreteAccessKey + } +} + +func AwsRegion(awsRegion string) Option { + return func(so *dynamodbOption) { + so.awsRegion = awsRegion + } +} + +func AwsSharedProfile(awsSharedProfile string) Option { + return func(so *dynamodbOption) { + so.awsSharedProfile = awsSharedProfile + } +} + +type queryOpt struct { + indexName string + limit int32 + consistentReads bool + scanIndexForward bool +} + +type QueryOption func(*queryOpt) + +func generateQueryOpt(options []QueryOption) (queryOpt, error) { + computed := queryOpt{ + indexName: "", + limit: 0, + } + + for _, option := range options { + option(&computed) + } + + return computed, nil +} + +func WithIndexName(indexName string) QueryOption { + return func(so *queryOpt) { + so.indexName = indexName + } +} + +func WithLimit(limit int32) QueryOption { + return func(so *queryOpt) { + so.limit = limit + } +} + +func WithConsistentReads(consistentReads bool) QueryOption { + return func(so *queryOpt) { + so.consistentReads = consistentReads + } +} + +func WithScanIndexForward(scanIndexForward bool) QueryOption { + return func(so *queryOpt) { + so.scanIndexForward = scanIndexForward + } +} diff --git a/internal/datastore/dynamodb/reader.go b/internal/datastore/dynamodb/reader.go new file mode 100644 index 000000000..163f87a9d --- /dev/null +++ b/internal/datastore/dynamodb/reader.go @@ -0,0 +1,648 @@ +package dynamodb + +import ( + "context" + "fmt" + "math" + + sq "github.com/Masterminds/squirrel" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" + corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/tuple" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" + ddbv2 "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +type dynamodbReader struct { + ds DynamodbDatastore +} + +func NewDynamodbReader(ds DynamodbDatastore) datastore.Reader { + + return dynamodbReader{ + ds, + } +} + +// CountRelationships implements datastore.Reader. +func (d dynamodbReader) CountRelationships(ctx context.Context, name string) (int, error) { + // get the counter by name + key := expression.KeyEqual(expression.Key(PK), expression.Value(Counter.PK.Build(KeyValues{ + ColEntity: &Namespace.Entity, + }))) + + key = key.And(expression.KeyEqual(expression.Key(SK), expression.Value(Counter.SK.Build(KeyValues{ + ColCounterName: &name, + })))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + + if err != nil { + return 0, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + if len(res) < 1 { + return 0, fmt.Errorf("No Counter for given name %s", name) + } + + item, ok := res[0].(map[string]interface{}) + if !ok { + return 0, fmt.Errorf("") + } + + count := 0 + + if v, exists := item[ColCounterCurrentCount]; exists { + count = v.(int) + } + + return count, nil +} + +// ListAllNamespaces implements datastore.Reader. +func (d dynamodbReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) { + key := expression. + KeyEqual( + expression.Key(PK), + expression.Value(Namespace. + PK.Build(KeyValues{ + ColEntity: &Namespace.Entity, + }))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + if err != nil { + return nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + resNs := []datastore.RevisionedNamespace{} + + for _, r := range res { + item, ok := r.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("") + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, fmt.Errorf("serialized attribute is not binary type") + } + + namespace := &corev1.NamespaceDefinition{} + + err := namespace.UnmarshalVT(binaryAttr) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + resNs = append(resNs, datastore.RevisionedNamespace{ + Definition: namespace, + LastWrittenRevision: datastore.NoRevision, + }) + } + + return resNs, nil +} + +// LookupCounters implements datastore.Reader. +func (d dynamodbReader) LookupCounters(ctx context.Context) ([]datastore.RelationshipCounter, error) { + // return all counters + key := expression.KeyEqual(expression.Key(PK), expression.Value(Counter.PK.Build(KeyValues{ + ColEntity: &Counter.Entity, + }))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + if err != nil { + return nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + resCounters := []datastore.RelationshipCounter{} + + for _, r := range res { + item, ok := r.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("") + } + + name := "" + count := 0 + + if v, exists := item[ColCounterName]; exists { + name = v.(string) + } + if v, exists := item[ColCounterCurrentCount]; exists { + count = v.(int) + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, fmt.Errorf("serialized attribute is not binary type") + } + + filter := &corev1.RelationshipFilter{} + + err := filter.UnmarshalVT(binaryAttr) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + resCounters = append(resCounters, datastore.RelationshipCounter{ + Name: name, + Filter: filter, + Count: count, + ComputedAtRevision: datastore.NoRevision, + }) + } + + return resCounters, nil +} + +// LookupNamespacesWithNames implements datastore.Reader. +func (d dynamodbReader) LookupNamespacesWithNames(ctx context.Context, nsNames []string) ([]datastore.RevisionedNamespace, error) { + + if len(nsNames) == 0 { + return nil, fmt.Errorf("nsNames list cannot be empty") + } + + values := []expression.OperandBuilder{} + for _, name := range nsNames { + ns := Namespace.LSI1SK.Build(KeyValues{ + ColNamespace: &name, + }) + values = append(values, expression.Value(ns)) + } + var filter expression.ConditionBuilder + + key := expression.KeyEqual(expression.Key(PK), expression.Value(Namespace.PK.Build(KeyValues{ + ColEntity: &Namespace.Entity, + }))) + + filter = expression.Name(LSI1SK).In(values[0], values[1:]...) + + expr, err := expression.NewBuilder(). + WithFilter(filter). + WithKeyCondition(key). + Build() + if err != nil { + return nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + if err != nil { + return nil, err + } + + resNs := []datastore.RevisionedNamespace{} + + for _, r := range res { + item, ok := r.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("") + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, fmt.Errorf("serialized attribute is not binary type") + } + + namespace := &corev1.NamespaceDefinition{} + + err := namespace.UnmarshalVT(binaryAttr) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + resNs = append(resNs, datastore.RevisionedNamespace{ + Definition: namespace, + LastWrittenRevision: datastore.NoRevision, + }) + } + + return resNs, nil +} + +// QueryRelationships implements datastore.Reader. +func (d dynamodbReader) QueryRelationships(ctx context.Context, filter datastore.RelationshipsFilter, opts ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) { + query := Query(filter) + + query = query.Column(EntityRelationTuple) + + sql, args, err := query.ToSql() + if err != nil { + return nil, err + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + fmt.Printf("%s, \t %s\n", sql, args) + + // for _, item := range res.Items { + // item[EntityRelationTuple] + // } + + // var out interface{} + + // err = attributevalue.UnmarshalListOfMaps(res.Items, &out) + // if err != nil { + // fmt.Printf("%#v\n ", err) + // return nil, err + // } + + // fmt.Printf("%#v", out[0][EntityRelationTuple]) + + var resourceObjectType string + var resourceObjectID string + var resourceRelation string + var subjectObjectType string + var subjectObjectID string + var subjectRelation string + // var caveatName sql.NullString + // var caveatCtx C + // var expiration *time.Time + + // var integrityKeyID string + // var integrityHash []byte + // var timestamp time.Time + + return func(yield func(tuple.Relationship, error) bool) { + fmt.Println("In side yield function") + res, err := d.ds.client.ExecuteStatement(ctx, &ddbv2.ExecuteStatementInput{ + Statement: aws.String(sql), + Parameters: parameters, + }) + if err != nil { + fmt.Println(err.Error()) + if !yield(tuple.Relationship{}, err) { + return + } + } + + kvp := KeyValues{} + kvp[ColNamespace] = &resourceObjectType + kvp[ColObjectID] = &resourceObjectID + kvp[ColRelation] = &resourceRelation + kvp[ColUsersetNamespace] = &subjectObjectType + kvp[ColUsersetObjectID] = &subjectObjectID + kvp[ColUsersetRelation] = &subjectRelation + kvp[ColEntity] = &RelationTuple.Entity + + rtAttr := RelationTuple.GetExtraAttrField(EntityRelationTuple) + + for _, item := range res.Items { + var out map[string]string + attributevalue.UnmarshalMap(item, &out) + + // if !exists { + // if !yield(tuple.Relationship{}, err) { + // return + // } + // } + + rt, exists := out[EntityRelationTuple] + + if !exists { + fmt.Println(rt) + if !yield(tuple.Relationship{}, err) { + return + } + } + + rtAttr.Value.Extract(rt, kvp) + + fmt.Printf("%#v\n", out) + + fmt.Printf("%#v\n", kvp) + + if !yield(tuple.Relationship{ + RelationshipReference: tuple.RelationshipReference{ + Resource: tuple.ObjectAndRelation{ + ObjectType: resourceObjectType, + ObjectID: resourceObjectID, + Relation: resourceRelation, + }, + Subject: tuple.ObjectAndRelation{ + ObjectType: subjectObjectType, + ObjectID: subjectObjectID, + Relation: subjectRelation, + }, + }, + // OptionalCaveat: caveat, + // OptionalExpiration: expiration, + // OptionalIntegrity: integrity, + }, nil) { + return + } + } + + }, nil +} + +// ReadNamespaceByName implements datastore.Reader. +func (d dynamodbReader) ReadNamespaceByName(ctx context.Context, nsName string) (ns *corev1.NamespaceDefinition, lastWritten datastore.Revision, err error) { + + n := Namespace.SK.Build(KeyValues{ + ColNamespace: &nsName, + }) + + key := expression.KeyEqual(expression.Key(PK), expression.Value(Namespace.PK.Build(KeyValues{ + ColEntity: &Namespace.Entity, + }))) + + key = key.And(expression.KeyEqual(expression.Key(SK), expression.Value(n))) + + expr, err := expression.NewBuilder(). + WithKeyCondition(key). + Build() + if err != nil { + return nil, nil, err + } + + res, err := d.ds.QueryItem(ctx, &expr) + + if len(res) < 1 { + return nil, nil, fmt.Errorf("No namespace for given name %s", nsName) + } + + item, ok := res[0].(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("") + } + + serializedAttr, exists := item[ColSerialized] + if !exists { + return nil, nil, fmt.Errorf("serialized attribute not found") + } + + binaryAttr, ok := serializedAttr.([]byte) + if !ok { + return nil, nil, fmt.Errorf("serialized attribute is not binary type") + } + + namespace := &corev1.NamespaceDefinition{} + + err = namespace.UnmarshalVT(binaryAttr) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal protobuf: %w", err) + } + + return namespace, nil, nil +} + +// ReverseQueryRelationships implements datastore.Reader. +func (d dynamodbReader) ReverseQueryRelationships(ctx context.Context, subjectsFilter datastore.SubjectsFilter, options ...options.ReverseQueryOptionsOption) (datastore.RelationshipIterator, error) { + filter := datastore.RelationshipsFilter{ + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + subjectsFilter.AsSelector(), + }, + } + + query := Query(filter) + + query = query.Column(EntityRelationTuple) + + sql, args, err := query.ToSql() + if err != nil { + return nil, err + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + fmt.Printf("%s, \t %s\n", sql, args) + + var resourceObjectType string + var resourceObjectID string + var resourceRelation string + var subjectObjectType string + var subjectObjectID string + var subjectRelation string + // var caveatName sql.NullString + // var caveatCtx C + // var expiration *time.Time + + // var integrityKeyID string + // var integrityHash []byte + // var timestamp time.Time + + return func(yield func(tuple.Relationship, error) bool) { + fmt.Println("In side yield function") + res, err := d.ds.client.ExecuteStatement(ctx, &ddbv2.ExecuteStatementInput{ + Statement: aws.String(sql), + Parameters: parameters, + }) + if err != nil { + fmt.Println(err.Error()) + if !yield(tuple.Relationship{}, err) { + return + } + } + + kvp := KeyValues{} + kvp[ColNamespace] = &resourceObjectType + kvp[ColObjectID] = &resourceObjectID + kvp[ColRelation] = &resourceRelation + kvp[ColUsersetNamespace] = &subjectObjectType + kvp[ColUsersetObjectID] = &subjectObjectID + kvp[ColUsersetRelation] = &subjectRelation + kvp[ColEntity] = &RelationTuple.Entity + + rtAttr := RelationTuple.GetExtraAttrField(EntityRelationTuple) + + for _, item := range res.Items { + var out map[string]string + attributevalue.UnmarshalMap(item, &out) + + // if !exists { + // if !yield(tuple.Relationship{}, err) { + // return + // } + // } + + rt, exists := out[EntityRelationTuple] + + if !exists { + fmt.Println(rt) + if !yield(tuple.Relationship{}, err) { + return + } + } + + rtAttr.Value.Extract(rt, kvp) + + fmt.Printf("%#v\n", out) + + fmt.Printf("%#v\n", kvp) + + if !yield(tuple.Relationship{ + RelationshipReference: tuple.RelationshipReference{ + Resource: tuple.ObjectAndRelation{ + ObjectType: resourceObjectType, + ObjectID: resourceObjectID, + Relation: resourceRelation, + }, + Subject: tuple.ObjectAndRelation{ + ObjectType: subjectObjectType, + ObjectID: subjectObjectID, + Relation: subjectRelation, + }, + }, + // OptionalCaveat: caveat, + // OptionalExpiration: expiration, + // OptionalIntegrity: integrity, + }, nil) { + return + } + } + + }, nil +} + +var _ datastore.Reader = &dynamodbReader{} + +func QueryHint(filter datastore.RelationshipsFilter) (*DynamodbIndex, KeyValues) { + avaiableFields := []string{} + + kvp := KeyValues{} + + if filter.OptionalResourceType != "" { + avaiableFields = append(avaiableFields, ColResourceType) + kvp[ColResourceType] = &filter.OptionalResourceType + } + + if len(filter.OptionalResourceIds) > 0 { + avaiableFields = append(avaiableFields, ColObjectID) + kvp[ColObjectID] = &filter.OptionalResourceIds[0] + } + + if filter.OptionalResourceRelation != "" { + avaiableFields = append(avaiableFields, ColRelation) + kvp[ColRelation] = &filter.OptionalResourceRelation + } + + if len(filter.OptionalSubjectsSelectors) > 0 { + hasST := false + hasSOid := false + hasRel := false + for _, ss := range filter.OptionalSubjectsSelectors { + if ss.OptionalSubjectType != "" { + hasST = true + } + + if len(ss.OptionalSubjectIds) > 0 { + hasSOid = true + } + + if !ss.RelationFilter.IsEmpty() { + hasRel = true + } + } + + if hasST { + avaiableFields = append(avaiableFields, ColUsersetNamespace) + kvp[ColUsersetNamespace] = &filter.OptionalSubjectsSelectors[0].OptionalSubjectType + } + + if hasSOid { + avaiableFields = append(avaiableFields, ColUsersetObjectID) + kvp[ColUsersetObjectID] = &filter.OptionalSubjectsSelectors[0].OptionalSubjectIds[0] + } + + if hasRel { + avaiableFields = append(avaiableFields, ColUsersetRelation) + kvp[ColUsersetRelation] = &filter.OptionalSubjectsSelectors[0].RelationFilter.NonEllipsisRelation + } + } + + bestScore := math.MaxInt64 + var bestIndex *DynamodbIndex = primary + for _, i := range Indexes { + unused, required := i.pk.Check(avaiableFields) + if len(required) != 0 { + continue + } + + unused, _ = i.sk.Check(unused) + + if bestScore > len(unused) { + bestIndex = i + bestScore = len(unused) + } + } + + return bestIndex, kvp +} + +func Query(filter datastore.RelationshipsFilter) sq.SelectBuilder { + query := sq.StatementBuilder.PlaceholderFormat(sq.Question).Select() + + idx, kvp := QueryHint(filter) + + pk, remaining := idx.pk.BuildFull(kvp) + + remainingKvp := KeyValues{} + for _, r := range remaining { + if v, exists := kvp[r]; exists { + remainingKvp[r] = v + } + } + sk, remaining, _ := idx.sk.BuildPartial(remainingKvp) + + tableName := TableName + + if idx.indexName != "" { + tableName += "." + idx.indexName + } + + query = query.From(tableName) + + if pk != "" { + query = query.Where(sq.Eq{idx.pkKey: pk}) + } + + if sk != "" { + query = query.Where(sq.Expr(fmt.Sprintf("begins_with(%s, ?)", idx.skKey), sk)) + } + + for _, r := range remaining { + query = query.Where(sq.Eq{r: kvp[r]}) + } + + return query +} diff --git a/internal/datastore/dynamodb/reader_test.go b/internal/datastore/dynamodb/reader_test.go new file mode 100644 index 000000000..3b9ba3781 --- /dev/null +++ b/internal/datastore/dynamodb/reader_test.go @@ -0,0 +1,56 @@ +package dynamodb + +import ( + "fmt" + "testing" + + "github.com/authzed/spicedb/pkg/datastore" +) + +func TestQueryHint(t *testing.T) { + filter := datastore.RelationshipsFilter{ + OptionalResourceType: "document", + // OptionalResourceIds: []string{"d1", "d2"}, + OptionalResourceRelation: "writer", + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + { + // OptionalSubjectIds: []string{ + // "u1", + // }, + OptionalSubjectType: "user", + RelationFilter: datastore.SubjectRelationFilter{ + NonEllipsisRelation: "...", + }, + }, + + // { + // }, + }, + } + + idx, _ := QueryHint(filter) + + fmt.Printf("%#v\n", idx) +} + +func TestQuery(t *testing.T) { + filter := datastore.RelationshipsFilter{ + // OptionalResourceType: "document", + // OptionalResourceIds: []string{"d1", "d2"}, + // OptionalResourceRelation: "writer", + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + { + OptionalSubjectIds: []string{ + "u1", + }, + OptionalSubjectType: "user", + RelationFilter: datastore.SubjectRelationFilter{ + NonEllipsisRelation: "...", + }, + }, + }, + } + + Query(filter) + +} diff --git a/internal/datastore/dynamodb/readwriter.go b/internal/datastore/dynamodb/readwriter.go new file mode 100644 index 000000000..329c04237 --- /dev/null +++ b/internal/datastore/dynamodb/readwriter.go @@ -0,0 +1,378 @@ +package dynamodb + +import ( + "context" + "fmt" + + sq "github.com/Masterminds/squirrel" + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/authzed/spicedb/internal/datastore/revisions" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" + corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/tuple" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbv2 "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +type DynamodbReadWriterTx struct { + datastore.Reader + ds DynamodbDatastore + xid revisions.HLCRevision +} + +var ( + update = partiql.Update(TableName) + delete = partiql.Delete(TableName) +) + +func NewDynamodbReadWriterTx(ds DynamodbDatastore, xid revisions.HLCRevision) datastore.ReadWriteTransaction { + return DynamodbReadWriterTx{ + &dynamodbReader{ + ds, + }, + ds, + xid, + } +} + +// BulkLoad implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) { + panic("unimplemented") +} + +// DeleteNamespaces implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) DeleteNamespaces(ctx context.Context, nsNames ...string) error { + statements := []types.BatchStatementRequest{} + + for _, name := range nsNames { + deleteNamespace := delete + + query := deleteNamespace. + Where(sq.Eq{PK: Namespace.PK.Build(KeyValues{ColEntity: &Namespace.Entity})}). + Where(sq.Eq{SK: Namespace.SK.Build(KeyValues{ColNamespace: &name})}) + + sql, args, err := query.ToSql() + if err != nil { + return err + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + statements = append(statements, types.BatchStatementRequest{ + Statement: aws.String(sql), + Parameters: parameters, + }) + } + + res, err := d.ds.client.BatchExecuteStatement(ctx, &ddbv2.BatchExecuteStatementInput{ + Statements: statements, + }) + if err != nil { + fmt.Println(err.Error()) + return err + } + + fmt.Printf("output - %#v\n", res) + + return nil +} + +// DeleteRelationships implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (uint64, bool, error) { + dsFilter, err := datastore.RelationshipsFilterFromPublicFilter(filter) + if err != nil { + return 0, false, fmt.Errorf("unable to translate relationship filter: %w", err) + } + + iter, err := d.QueryRelationships(ctx, dsFilter) + if err != nil { + return 0, false, err + } + + statements := []types.BatchStatementRequest{} + + for rel, err := range iter { + if err != nil { + fmt.Println(err) + continue + } + + kvp := KeyValues{} + kvp[ColResourceType] = &rel.Resource.ObjectType + kvp[ColRelation] = &rel.Resource.Relation + kvp[ColObjectID] = &rel.Resource.ObjectID + kvp[ColUsersetNamespace] = &rel.Subject.ObjectType + kvp[ColUsersetRelation] = &rel.Subject.Relation + kvp[ColUsersetObjectID] = &rel.Subject.ObjectID + kvp[ColEntity] = &RelationTuple.Entity + + deleteRelationship := delete + + query := deleteRelationship. + Where(sq.Eq{PK: RelationTuple.PK.Build(kvp)}). + Where(sq.Eq{SK: RelationTuple.SK.Build(kvp)}) + + sql, args, err := query.ToSql() + if err != nil { + fmt.Println(err) + continue + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + statements = append(statements, types.BatchStatementRequest{ + Statement: aws.String(sql), + Parameters: parameters, + }) + } + + res, err := d.ds.client.BatchExecuteStatement(ctx, &ddbv2.BatchExecuteStatementInput{ + Statements: statements, + }) + if err != nil { + fmt.Println(err.Error()) + return 0, false, err + } + + fmt.Printf("output - %#v\n", res) + return 0, false, nil +} + +// StoreCounterValue implements datastore.ReadWriteTransaction. +// TODO: computedAtRevision +func (d DynamodbReadWriterTx) StoreCounterValue(ctx context.Context, name string, value int, computedAtRevision datastore.Revision) error { + // update the counter with value + updateCounter := update + query := updateCounter. + Where(sq.Eq{PK: Counter.PK.Build(KeyValues{ColEntity: &Counter.Entity})}). + Where(sq.Eq{SK: Counter.SK.Build(KeyValues{ColCounterName: &name})}). + Set(ColCurrentCount, value) + + sql, args, err := query.ToSql() + if err != nil { + return err + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + res, err := d.ds.client.ExecuteStatement(ctx, &ddbv2.ExecuteStatementInput{ + Statement: aws.String(sql), + Parameters: parameters, + }) + if err != nil { + fmt.Println(err.Error()) + return err + } + + fmt.Printf("output - %#v\n", res) + + return nil +} + +// RegisterCounter implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) RegisterCounter(ctx context.Context, name string, filter *corev1.RelationshipFilter) error { + // create new counter + + kvp := KeyValues{} + + serialized, err := filter.MarshalVT() + if err != nil { + return fmt.Errorf("%d", err) + } + + kvp[ColCounterName] = &name + kvp[ColCreatedXid] = aws.String(d.xid.String()) + kvp[ColDeletedXid] = aws.String("") + + putRequest := Counter.GetPutItem(kvp, map[string]interface{}{ + ColSerialized: serialized, + }) + + out, err := d.ds.client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: &d.ds.tableName, + Item: putRequest.Item, + }) + + if err != nil { + fmt.Println(err) + } + + fmt.Printf("output - %#v\n", out) + return nil +} + +// UnregisterCounter implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) UnregisterCounter(ctx context.Context, name string) error { + // delete the counter + deleteCounter := delete + + query := deleteCounter. + Where(sq.Eq{PK: Counter.PK.Build(KeyValues{ColEntity: &Counter.Entity})}). + Where(sq.Eq{SK: Counter.SK.Build(KeyValues{ColCounterName: &name})}) + + sql, args, err := query.ToSql() + if err != nil { + return err + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + + res, err := d.ds.client.ExecuteStatement(ctx, &ddbv2.ExecuteStatementInput{ + Statement: aws.String(sql), + Parameters: parameters, + }) + if err != nil { + fmt.Println(err.Error()) + return err + } + + fmt.Printf("output - %#v\n", res) + + return nil +} + +// WriteNamespaces implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) WriteNamespaces(ctx context.Context, newConfigs ...*corev1.NamespaceDefinition) error { + wr := []types.WriteRequest{} + + for _, newNamespace := range newConfigs { + serialized, err := newNamespace.MarshalVT() + if err != nil { + return fmt.Errorf("%d", err) + } + + kvp := KeyValues{} + + kvp[ColNamespace] = &newNamespace.Name + kvp[ColCreatedXid] = aws.String(d.xid.String()) + kvp[ColDeletedXid] = aws.String("") + + wr = append(wr, types.WriteRequest{ + PutRequest: Namespace.GetPutItem(kvp, map[string]interface{}{ + ColSerialized: serialized, + }), + }) + + } + + out, err := d.ds.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ + TableName: wr, + }, + }) + + if err != nil { + fmt.Println(err) + } + + fmt.Printf("output - %#v\n", out) + return nil + +} + +// WriteRelationships implements datastore.ReadWriteTransaction. +func (d DynamodbReadWriterTx) WriteRelationships(ctx context.Context, mutations []tuple.RelationshipUpdate) error { + wr := []types.WriteRequest{} + statements := []types.BatchStatementRequest{} + + for _, mut := range mutations { + + kvp := KeyValues{} + kvp[ColResourceType] = &mut.Relationship.Resource.ObjectType + kvp[ColObjectID] = &mut.Relationship.Resource.ObjectID + kvp[ColRelation] = &mut.Relationship.Resource.Relation + kvp[ColUsersetNamespace] = &mut.Relationship.Subject.ObjectType + kvp[ColUsersetObjectID] = &mut.Relationship.Subject.ObjectID + kvp[ColUsersetRelation] = &mut.Relationship.Subject.Relation + kvp[ColEntity] = &RelationTuple.Entity + + kvp[ColCreatedXid] = aws.String(d.xid.String()) + kvp[ColDeletedXid] = aws.String("0") + + extraAttr := map[string]interface{}{} + + if mut.Relationship.OptionalCaveat != nil { + extraAttr[ColCaveatContextName] = mut.Relationship.OptionalCaveat.CaveatName + extraAttr[ColCaveatContext] = mut.Relationship.OptionalCaveat.Context.String() + } + + switch mut.Operation { + case tuple.UpdateOperationCreate, tuple.UpdateOperationTouch: + wr = append(wr, types.WriteRequest{ + PutRequest: RelationTuple.GetPutItem(kvp, extraAttr), + }) + case tuple.UpdateOperationDelete: + deleteRelation := delete + + query := deleteRelation. + Where(sq.Eq{PK: RelationTuple.PK.Build(kvp)}). + Where(sq.Eq{SK: RelationTuple.SK.Build(kvp)}) + + sql, args, err := query.ToSql() + if err != nil { + fmt.Println(err) + continue + } + parameters := []types.AttributeValue{} + for _, v := range args { + parameters = append(parameters, &types.AttributeValueMemberS{ + Value: v.(string), + }) + } + statements = append(statements, types.BatchStatementRequest{ + Statement: aws.String(sql), + Parameters: parameters, + }) + default: + return spiceerrors.MustBugf("unknown tuple mutation: %v", mut) + } + } + + if len(wr) != 0 { + _, err := d.ds.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ + TableName: wr, + }, + }) + + if err != nil { + return err + } + } + + if len(statements) != 0 { + res, err := d.ds.client.BatchExecuteStatement(ctx, &ddbv2.BatchExecuteStatementInput{ + Statements: statements, + }) + if err != nil { + fmt.Println(err.Error()) + return err + } + + fmt.Printf("output - %#v\n", res) + } + + return nil +} + +var _ datastore.ReadWriteTransaction = DynamodbReadWriterTx{} diff --git a/internal/datastore/revisions/hlcrevision.go b/internal/datastore/revisions/hlcrevision.go index e4f7fc650..b28f7b76c 100644 --- a/internal/datastore/revisions/hlcrevision.go +++ b/internal/datastore/revisions/hlcrevision.go @@ -98,6 +98,11 @@ func NewHLCForTime(time time.Time) HLCRevision { return HLCRevision{time.UnixNano(), logicalClockOffset} } +// NewHLCForTime creates a new revision for the given time. +func NewHLC(time int64, logicalclock uint32) HLCRevision { + return HLCRevision{time, logicalClockOffset + logicalclock} +} + func (hlc HLCRevision) ByteSortable() bool { return true } @@ -138,6 +143,10 @@ func (hlc HLCRevision) TimestampNanoSec() int64 { return hlc.time } +func (hlc HLCRevision) LogicalClock() uint32 { + return hlc.logicalclock +} + func (hlc HLCRevision) InexactFloat64() float64 { return float64(hlc.time) + float64(hlc.logicalclock-logicalClockOffset)/math.Pow10(logicalClockLength) } diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index 56b0e491b..5b3ef97a6 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -13,6 +13,7 @@ import ( "github.com/spf13/pflag" "github.com/authzed/spicedb/internal/datastore/crdb" + "github.com/authzed/spicedb/internal/datastore/dynamodb" "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/datastore/mysql" "github.com/authzed/spicedb/internal/datastore/postgres" @@ -37,6 +38,7 @@ const ( CockroachEngine = "cockroachdb" SpannerEngine = "spanner" MySQLEngine = "mysql" + DynamoEngine = "dynamodb" ) var BuilderForEngine = map[string]engineBuilderFunc{ @@ -45,6 +47,7 @@ var BuilderForEngine = map[string]engineBuilderFunc{ MemoryEngine: newMemoryDatstore, SpannerEngine: newSpannerDatastore, MySQLEngine: newMySQLDatastore, + DynamoEngine: newDynamodbDatastore, } //go:generate go run github.com/ecordell/optgen -output zz_generated.connpool.options.go . ConnPoolConfig @@ -162,6 +165,11 @@ type Config struct { // MySQL TablePrefix string `debugmap:"visible"` + // DynamoDB + AwsRegion string + AwsAccessKeyId string + AwsScreteAccessKey string + // Relationship Integrity RelationshipIntegrityEnabled bool `debugmap:"visible"` RelationshipIntegrityCurrentKey RelIntegrityKey `debugmap:"visible"` @@ -287,6 +295,10 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.StringVar(&opts.RelationshipIntegrityCurrentKey.KeyFilename, flagName("datastore-relationship-integrity-current-key-filename"), "", "current key filename for relationship integrity checks") flagSet.StringArrayVar(&opts.RelationshipIntegrityExpiredKeys, flagName("datastore-relationship-integrity-expired-keys"), []string{}, "config for expired keys for relationship integrity checks") + flagSet.StringVar(&opts.AwsRegion, flagName("aws-region"), "", "config for AWS region for dynamodb") + flagSet.StringVar(&opts.AwsAccessKeyId, flagName("aws-access-key-id"), "", "config for AWS Access key Id") + flagSet.StringVar(&opts.AwsScreteAccessKey, flagName("aws-screte-access-key"), "", "config for AWS Screte Access Key") + // disabling stats is only for tests flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table") if err := flagSet.MarkHidden(flagName("datastore-disable-stats")); err != nil { @@ -569,6 +581,19 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er ) } +func newDynamodbDatastore(ctx context.Context, opts Config) (datastore.Datastore, error) { + if len(opts.ReadReplicaURIs) > 0 { + return nil, errors.New("read replicas are not supported for the dynamodb datastore engine") + } + + return dynamodb.NewDynamodbDatastore(ctx, + dynamodb.AwsUrl(opts.URI), + dynamodb.AwsAccessKeyId(opts.AwsAccessKeyId), + dynamodb.AwsScreteAccessKey(opts.AwsScreteAccessKey), + dynamodb.AwsRegion(opts.AwsRegion), + ) +} + func newPostgresDatastore(ctx context.Context, opts Config) (datastore.Datastore, error) { primary, err := newPostgresPrimaryDatastore(ctx, opts) if err != nil {