diff --git a/docs/api/README.md b/docs/api/README.md index 805cb9762..1367463fa 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -3130,6 +3130,7 @@ Accept: application/json ```json { "name": "string", + "query": {}, "accountIDs": [ "string" ] @@ -3214,6 +3215,8 @@ Accept: application/json "id": "string", "name": "string", "createdAt": "2019-08-24T14:15:22Z", + "type": "string", + "query": {}, "poolAccounts": [ "string" ] @@ -3266,6 +3269,8 @@ Accept: application/json "id": "string", "name": "string", "createdAt": "2019-08-24T14:15:22Z", + "type": "string", + "query": {}, "poolAccounts": [ "string" ] @@ -6335,6 +6340,7 @@ None ( Scopes: payments:read ) ```json { "name": "string", + "query": {}, "accountIDs": [ "string" ] @@ -6347,7 +6353,8 @@ None ( Scopes: payments:read ) |Name|Type|Required|Restrictions|Description| |---|---|---|---|---| |name|string|true|none|none| -|accountIDs|[string]|true|none|none| +|query|object|false|none|none| +|accountIDs|[string]|false|none|none|

V3CreatePoolResponse

@@ -6388,6 +6395,8 @@ None ( Scopes: payments:read ) "id": "string", "name": "string", "createdAt": "2019-08-24T14:15:22Z", + "type": "string", + "query": {}, "poolAccounts": [ "string" ] @@ -6422,6 +6431,8 @@ None ( Scopes: payments:read ) "id": "string", "name": "string", "createdAt": "2019-08-24T14:15:22Z", + "type": "string", + "query": {}, "poolAccounts": [ "string" ] @@ -6476,6 +6487,8 @@ None ( Scopes: payments:read ) "id": "string", "name": "string", "createdAt": "2019-08-24T14:15:22Z", + "type": "string", + "query": {}, "poolAccounts": [ "string" ] @@ -6490,6 +6503,8 @@ None ( Scopes: payments:read ) |id|string|true|none|none| |name|string|true|none|none| |createdAt|string(date-time)|true|none|none| +|type|string|true|none|none| +|query|object|false|none|none| |poolAccounts|[[V3AccountID](#schemav3accountid)]|true|none|none|

V3PoolBalances

diff --git a/internal/api/services/pools_balances.go b/internal/api/services/pools_balances.go index b4f36e340..17b7c6dbd 100644 --- a/internal/api/services/pools_balances.go +++ b/internal/api/services/pools_balances.go @@ -21,6 +21,15 @@ func (s *Service) PoolsBalances( if err != nil { return nil, newStorageError(err, "cannot get pool") } + + if pool.Type == models.POOL_TYPE_DYNAMIC { + // populate the pool accounts from the query + pool.PoolAccounts, err = s.populatePoolAccounts(ctx, pool) + if err != nil { + return nil, newStorageError(err, "cannot populate pool accounts") + } + } + res := make(map[string]*aggregatedBalance) for i := range pool.PoolAccounts { balances, err := s.storage.BalancesGetLatest(ctx, pool.PoolAccounts[i]) diff --git a/internal/api/services/pools_balances_at.go b/internal/api/services/pools_balances_at.go index d86ca353e..4322fde5e 100644 --- a/internal/api/services/pools_balances_at.go +++ b/internal/api/services/pools_balances_at.go @@ -2,10 +2,14 @@ package services import ( "context" + "encoding/json" "math/big" "time" + "github.com/formancehq/go-libs/v3/bun/bunpaginate" + "github.com/formancehq/go-libs/v3/query" "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/internal/storage" "github.com/google/uuid" ) @@ -18,6 +22,15 @@ func (s *Service) PoolsBalancesAt( if err != nil { return nil, newStorageError(err, "cannot get pool") } + + if pool.Type == models.POOL_TYPE_DYNAMIC { + // populate the pool accounts from the query + pool.PoolAccounts, err = s.populatePoolAccounts(ctx, pool) + if err != nil { + return nil, newStorageError(err, "cannot populate pool accounts") + } + } + res := make(map[string]*aggregatedBalance) for i := range pool.PoolAccounts { balances, err := s.storage.BalancesGetAt(ctx, pool.PoolAccounts[i], at) @@ -51,3 +64,44 @@ func (s *Service) PoolsBalancesAt( return balances, nil } + +func (s *Service) populatePoolAccounts(ctx context.Context, pool *models.Pool) ([]models.AccountID, error) { + queryJSON, err := json.Marshal(pool.Query) + if err != nil { + return nil, newStorageError(err, "cannot marshal pool query") + } + + qb, err := query.ParseJSON(string(queryJSON)) + if err != nil { + return nil, newStorageError(err, "cannot parse pool query") + } + + q := storage.NewListAccountsQuery( + bunpaginate.NewPaginatedQueryOptions(storage.AccountQuery{}). + WithPageSize(100). + WithQueryBuilder(qb), + ) + + res := make([]models.AccountID, 0) + for { + cursor, err := s.storage.AccountsList(ctx, q) + if err != nil { + return nil, newStorageError(err, "cannot list accounts") + } + + for _, account := range cursor.Data { + res = append(res, account.ID) + } + + if !cursor.HasMore { + break + } + + err = bunpaginate.UnmarshalCursor(cursor.Next, &q) + if err != nil { + return nil, newStorageError(err, "cannot unmarshal cursor") + } + } + + return res, nil +} diff --git a/internal/api/services/pools_balances_at_test.go b/internal/api/services/pools_balances_at_test.go index a070f9dc2..fcf1f6fd4 100644 --- a/internal/api/services/pools_balances_at_test.go +++ b/internal/api/services/pools_balances_at_test.go @@ -93,6 +93,7 @@ func TestPoolsBalancesAt(t *testing.T) { store.EXPECT().PoolsGet(gomock.Any(), id).Return(&models.Pool{ ID: id, Name: "test", + Type: models.POOL_TYPE_STATIC, CreatedAt: at, PoolAccounts: poolsAccount, }, test.poolsGetStorageErr) diff --git a/internal/api/services/pools_balances_test.go b/internal/api/services/pools_balances_test.go index 6fb705c38..e00d7f1fb 100644 --- a/internal/api/services/pools_balances_test.go +++ b/internal/api/services/pools_balances_test.go @@ -93,6 +93,7 @@ func TestPoolsBalancesLatest(t *testing.T) { ID: id, Name: "test", CreatedAt: time.Now().Add(-time.Hour), + Type: models.POOL_TYPE_STATIC, PoolAccounts: poolsAccount, }, test.poolsGetStorageErr) if test.poolsGetStorageErr == nil { diff --git a/internal/api/services/pools_get.go b/internal/api/services/pools_get.go index 895634897..5d3c7d4ff 100644 --- a/internal/api/services/pools_get.go +++ b/internal/api/services/pools_get.go @@ -13,5 +13,13 @@ func (s *Service) PoolsGet(ctx context.Context, id uuid.UUID) (*models.Pool, err return nil, newStorageError(err, "cannot get pool") } + if p.Type == models.POOL_TYPE_DYNAMIC { + // populate the pool accounts from the query + p.PoolAccounts, err = s.populatePoolAccounts(ctx, p) + if err != nil { + return nil, newStorageError(err, "cannot populate pool accounts") + } + } + return p, nil } diff --git a/internal/api/services/pools_list.go b/internal/api/services/pools_list.go index 86c020c92..5f4ca46ad 100644 --- a/internal/api/services/pools_list.go +++ b/internal/api/services/pools_list.go @@ -6,6 +6,7 @@ import ( "github.com/formancehq/go-libs/v3/bun/bunpaginate" "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/storage" + "golang.org/x/sync/errgroup" ) func (s *Service) PoolsList(ctx context.Context, query storage.ListPoolsQuery) (*bunpaginate.Cursor[models.Pool], error) { @@ -14,5 +15,23 @@ func (s *Service) PoolsList(ctx context.Context, query storage.ListPoolsQuery) ( return nil, newStorageError(err, "cannot list pools") } + errGroup, egCtx := errgroup.WithContext(ctx) + for i := range ps.Data { + index := i + errGroup.Go(func() error { + if ps.Data[index].Type == models.POOL_TYPE_DYNAMIC { + ps.Data[index].PoolAccounts, err = s.populatePoolAccounts(egCtx, &ps.Data[index]) + if err != nil { + return newStorageError(err, "cannot populate pool accounts") + } + } + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, newStorageError(err, "cannot populate pool accounts") + } + return ps, nil } diff --git a/internal/api/services/pools_list_test.go b/internal/api/services/pools_list_test.go index c02ca4d2a..65975643b 100644 --- a/internal/api/services/pools_list_test.go +++ b/internal/api/services/pools_list_test.go @@ -5,7 +5,9 @@ import ( "fmt" "testing" + "github.com/formancehq/go-libs/v3/bun/bunpaginate" "github.com/formancehq/payments/internal/connectors/engine" + "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/storage" "github.com/stretchr/testify/require" gomock "go.uber.org/mock/gomock" @@ -46,7 +48,7 @@ func TestPoolsList(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { query := storage.ListPoolsQuery{} - store.EXPECT().PoolsList(gomock.Any(), query).Return(nil, test.err) + store.EXPECT().PoolsList(gomock.Any(), query).Return(&bunpaginate.Cursor[models.Pool]{}, test.err) _, err := s.PoolsList(context.Background(), query) if test.expectedError == nil { require.NoError(t, err) diff --git a/internal/api/v2/handler_pools_create.go b/internal/api/v2/handler_pools_create.go index 571d7116c..2fc7219d7 100644 --- a/internal/api/v2/handler_pools_create.go +++ b/internal/api/v2/handler_pools_create.go @@ -18,14 +18,17 @@ import ( ) type CreatePoolRequest struct { - Name string `json:"name" validate:"required"` - AccountIDs []string `json:"accountIDs" validate:"min=1,dive,accountID"` + Name string `json:"name" validate:"required"` + Query map[string]any `json:"query" validate:"required_without=AccountIDs"` + AccountIDs []string `json:"accountIDs" validate:"required_without=Query,dive,accountID"` } type PoolResponse struct { - ID string `json:"id"` - Name string `json:"name"` - Accounts []string `json:"accounts"` + ID string `json:"id"` + Name string `json:"name"` + Type models.PoolType `json:"type"` + Query map[string]any `json:"query"` + Accounts []string `json:"accounts"` } func poolsCreate(backend backend.Backend, validator *validation.Validator) http.HandlerFunc { @@ -55,18 +58,24 @@ func poolsCreate(backend backend.Backend, validator *validation.Validator) http. CreatedAt: time.Now().UTC(), } - accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs)) - for i, accountID := range CreatePoolRequest.AccountIDs { - aID, err := models.AccountIDFromString(accountID) - if err != nil { - otel.RecordError(span, err) - api.BadRequest(w, ErrValidation, err) - return - } + if len(CreatePoolRequest.Query) > 0 { + pool.Type = models.POOL_TYPE_DYNAMIC + pool.Query = CreatePoolRequest.Query + } else { + pool.Type = models.POOL_TYPE_STATIC + accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs)) + for i, accountID := range CreatePoolRequest.AccountIDs { + aID, err := models.AccountIDFromString(accountID) + if err != nil { + otel.RecordError(span, err) + api.BadRequest(w, ErrValidation, err) + return + } - accounts[i] = aID + accounts[i] = aID + } + pool.PoolAccounts = accounts } - pool.PoolAccounts = accounts err = backend.PoolsCreate(ctx, pool) if err != nil { @@ -78,6 +87,8 @@ func poolsCreate(backend backend.Backend, validator *validation.Validator) http. data := &PoolResponse{ ID: pool.ID.String(), Name: pool.Name, + Type: pool.Type, + Query: pool.Query, Accounts: CreatePoolRequest.AccountIDs, } diff --git a/internal/api/v2/handler_pools_create_test.go b/internal/api/v2/handler_pools_create_test.go index 712d93e5f..1ca5b4333 100644 --- a/internal/api/v2/handler_pools_create_test.go +++ b/internal/api/v2/handler_pools_create_test.go @@ -75,5 +75,19 @@ var _ = Describe("API v2 Pools Create", func() { handlerFn(w, prepareJSONRequest(http.MethodPost, &cpr)) assertExpectedResponse(w.Result(), http.StatusOK, "data") }) + + It("should return status ok on success with a query", func(ctx SpecContext) { + m.EXPECT().PoolsCreate(gomock.Any(), gomock.Any()).Return(nil) + cpr = CreatePoolRequest{ + Name: "name", + Query: map[string]any{ + "$match": map[string]any{ + "account_id": accID.String(), + }, + }, + } + handlerFn(w, prepareJSONRequest(http.MethodPost, &cpr)) + assertExpectedResponse(w.Result(), http.StatusOK, "data") + }) }) }) diff --git a/internal/api/v2/handler_pools_get.go b/internal/api/v2/handler_pools_get.go index 569e14f5b..268ff3e85 100644 --- a/internal/api/v2/handler_pools_get.go +++ b/internal/api/v2/handler_pools_get.go @@ -33,8 +33,10 @@ func poolsGet(backend backend.Backend) http.HandlerFunc { } data := &PoolResponse{ - ID: pool.ID.String(), - Name: pool.Name, + ID: pool.ID.String(), + Name: pool.Name, + Type: pool.Type, + Query: pool.Query, } accounts := make([]string, len(pool.PoolAccounts)) diff --git a/internal/api/v2/handler_pools_list.go b/internal/api/v2/handler_pools_list.go index 5a6268a15..63423d8c4 100644 --- a/internal/api/v2/handler_pools_list.go +++ b/internal/api/v2/handler_pools_list.go @@ -41,8 +41,10 @@ func poolsList(backend backend.Backend) http.HandlerFunc { data := make([]*PoolResponse, len(cursor.Data)) for i := range cursor.Data { data[i] = &PoolResponse{ - ID: cursor.Data[i].ID.String(), - Name: cursor.Data[i].Name, + ID: cursor.Data[i].ID.String(), + Type: cursor.Data[i].Type, + Query: cursor.Data[i].Query, + Name: cursor.Data[i].Name, } accounts := make([]string, len(cursor.Data[i].PoolAccounts)) diff --git a/internal/api/v3/handler_pools_create.go b/internal/api/v3/handler_pools_create.go index 9d10dd2a2..c3a41d1e0 100644 --- a/internal/api/v3/handler_pools_create.go +++ b/internal/api/v3/handler_pools_create.go @@ -17,8 +17,9 @@ import ( ) type CreatePoolRequest struct { - Name string `json:"name" validate:"required"` - AccountIDs []string `json:"accountIDs" validate:"min=1,dive,accountID"` + Name string `json:"name" validate:"required"` + Query map[string]any `json:"query" validate:"required_without=AccountIDs"` + AccountIDs []string `json:"accountIDs" validate:"required_without=Query,dive,accountID"` } func poolsCreate(backend backend.Backend, validator *validation.Validator) http.HandlerFunc { @@ -48,18 +49,24 @@ func poolsCreate(backend backend.Backend, validator *validation.Validator) http. CreatedAt: time.Now().UTC(), } - accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs)) - for i, accountID := range CreatePoolRequest.AccountIDs { - aID, err := models.AccountIDFromString(accountID) - if err != nil { - otel.RecordError(span, err) - api.BadRequest(w, ErrValidation, err) - return - } + if len(CreatePoolRequest.Query) > 0 { + pool.Type = models.POOL_TYPE_DYNAMIC + pool.Query = CreatePoolRequest.Query + } else { + pool.Type = models.POOL_TYPE_STATIC + accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs)) + for i, accountID := range CreatePoolRequest.AccountIDs { + aID, err := models.AccountIDFromString(accountID) + if err != nil { + otel.RecordError(span, err) + api.BadRequest(w, ErrValidation, err) + return + } - accounts[i] = aID + accounts[i] = aID + } + pool.PoolAccounts = accounts } - pool.PoolAccounts = accounts err = backend.PoolsCreate(ctx, pool) if err != nil { diff --git a/internal/api/v3/handler_pools_create_test.go b/internal/api/v3/handler_pools_create_test.go index 8f080f472..541e16a47 100644 --- a/internal/api/v3/handler_pools_create_test.go +++ b/internal/api/v3/handler_pools_create_test.go @@ -75,5 +75,19 @@ var _ = Describe("API v3 Pools Create", func() { handlerFn(w, prepareJSONRequest(http.MethodPost, &cpr)) assertExpectedResponse(w.Result(), http.StatusCreated, "data") }) + + It("should return status created on success with a query", func(ctx SpecContext) { + m.EXPECT().PoolsCreate(gomock.Any(), gomock.Any()).Return(nil) + cpr = CreatePoolRequest{ + Name: "name", + Query: map[string]any{ + "$match": map[string]any{ + "account_id": accID.String(), + }, + }, + } + handlerFn(w, prepareJSONRequest(http.MethodPost, &cpr)) + assertExpectedResponse(w.Result(), http.StatusCreated, "data") + }) }) }) diff --git a/internal/connectors/engine/engine.go b/internal/connectors/engine/engine.go index 33febc5b3..ec0b69a1e 100644 --- a/internal/connectors/engine/engine.go +++ b/internal/connectors/engine/engine.go @@ -1459,6 +1459,18 @@ func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID m return fmt.Errorf("account %s is not an internal account: %w", accountID, ErrValidation) } + // Check next if the pool is dynamic, in that case, we send an error to the + // user telling him to use the pool query update endpoint. + pool, err := e.storage.PoolsGet(ctx, id) + if err != nil { + otel.RecordError(span, err) + return err + } + + if pool.Type == models.POOL_TYPE_DYNAMIC { + return fmt.Errorf("pool %s is a dynamic pool, use the pool query update endpoint to add an account: %w", id, ErrValidation) + } + if err := e.storage.PoolsAddAccount(ctx, id, accountID); err != nil { otel.RecordError(span, err) return err @@ -1470,12 +1482,6 @@ func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID m e.wg.Add(1) defer e.wg.Done() - pool, err := e.storage.PoolsGet(detachedCtx, id) - if err != nil { - otel.RecordError(span, err) - return err - } - // Do not wait for sending of events _, err = e.temporalClient.ExecuteWorkflow( detachedCtx, @@ -1505,6 +1511,16 @@ func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accoun ctx, span := otel.Tracer().Start(ctx, "engine.RemoveAccountFromPool") defer span.End() + pool, err := e.storage.PoolsGet(ctx, id) + if err != nil { + otel.RecordError(span, err) + return err + } + + if pool.Type == models.POOL_TYPE_DYNAMIC { + return fmt.Errorf("pool %s is a dynamic pool, use the pool query update endpoint to remove an account: %w", id, ErrValidation) + } + if err := e.storage.PoolsRemoveAccount(ctx, id, accountID); err != nil { otel.RecordError(span, err) return err @@ -1516,12 +1532,6 @@ func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accoun e.wg.Add(1) defer e.wg.Done() - pool, err := e.storage.PoolsGet(detachedCtx, id) - if err != nil { - otel.RecordError(span, err) - return err - } - // Do not wait for sending of events _, err = e.temporalClient.ExecuteWorkflow( detachedCtx, diff --git a/internal/connectors/engine/engine_test.go b/internal/connectors/engine/engine_test.go index de5619e8a..5957ae1af 100644 --- a/internal/connectors/engine/engine_test.go +++ b/internal/connectors/engine/engine_test.go @@ -597,6 +597,7 @@ var _ = Describe("Engine Tests", func() { ID: accountID, Type: models.ACCOUNT_TYPE_INTERNAL, }, nil) + store.EXPECT().PoolsGet(gomock.Any(), poolID).Return(&models.Pool{}, nil) store.EXPECT().PoolsAddAccount(gomock.Any(), poolID, accountID).Return(fmt.Errorf("failed to add account to pool")) err := eng.AddAccountToPool(ctx, poolID, accountID) Expect(err).ToNot(BeNil()) @@ -608,8 +609,8 @@ var _ = Describe("Engine Tests", func() { ID: accountID, Type: models.ACCOUNT_TYPE_INTERNAL, }, nil) - store.EXPECT().PoolsAddAccount(gomock.Any(), poolID, accountID).Return(nil) store.EXPECT().PoolsGet(gomock.Any(), poolID).Return(&models.Pool{}, nil) + store.EXPECT().PoolsAddAccount(gomock.Any(), poolID, accountID).Return(nil) cl.EXPECT().ExecuteWorkflow(gomock.Any(), WithWorkflowOptions("pools-add-account", defaultTaskQueue), workflow.RunSendEvents, gomock.AssignableToTypeOf(workflow.SendEvents{}), diff --git a/internal/models/pools.go b/internal/models/pools.go index ccd195569..bda032875 100644 --- a/internal/models/pools.go +++ b/internal/models/pools.go @@ -7,10 +7,25 @@ import ( "github.com/google/uuid" ) +type PoolType string + +const ( + // Pools created with a list of accounts. No dynamic changes, the user can + // add/delete accounts from the pool via specific endpoints. + POOL_TYPE_STATIC PoolType = "STATIC" + // Pools created with an account list query. The user cannot add/delete + // accounts from the pool directly from endpoints, but can change the query + // to match the right accounts. + POOL_TYPE_DYNAMIC PoolType = "DYNAMIC" +) + type Pool struct { ID uuid.UUID `json:"id"` Name string `json:"name"` CreatedAt time.Time `json:"createdAt"` + Type PoolType `json:"type"` + + Query map[string]any `json:"query"` PoolAccounts []AccountID `json:"poolAccounts"` } @@ -20,12 +35,14 @@ func (p Pool) MarshalJSON() ([]byte, error) { ID string `json:"id"` Name string `json:"name"` CreatedAt time.Time `json:"createdAt"` + Type PoolType `json:"type"` PoolAccounts []string `json:"poolAccounts"` } aux.ID = p.ID.String() aux.Name = p.Name aux.CreatedAt = p.CreatedAt + aux.Type = p.Type aux.PoolAccounts = make([]string, len(p.PoolAccounts)) for i := range p.PoolAccounts { @@ -40,6 +57,7 @@ func (p *Pool) UnmarshalJSON(data []byte) error { ID string `json:"id"` Name string `json:"name"` CreatedAt time.Time `json:"createdAt"` + Type PoolType `json:"type"` PoolAccounts []string `json:"poolAccounts"` } @@ -64,6 +82,7 @@ func (p *Pool) UnmarshalJSON(data []byte) error { p.ID = id p.Name = aux.Name p.CreatedAt = aux.CreatedAt + p.Type = aux.Type p.PoolAccounts = poolAccounts return nil diff --git a/internal/storage/migrations/24-dynamic-pools.sql b/internal/storage/migrations/24-dynamic-pools.sql new file mode 100644 index 000000000..f1e732699 --- /dev/null +++ b/internal/storage/migrations/24-dynamic-pools.sql @@ -0,0 +1,4 @@ +-- This is not a critical table, so we can add a default and locking the table +-- without worries. +ALTER TABLE pools ADD COLUMN IF NOT EXISTS type text NOT NULL DEFAULT 'STATIC'; +ALTER TABLE pools ADD COLUMN IF NOT EXISTS query jsonb; \ No newline at end of file diff --git a/internal/storage/migrations/migrations.go b/internal/storage/migrations/migrations.go index 06d51f9cb..ae003fdac 100644 --- a/internal/storage/migrations/migrations.go +++ b/internal/storage/migrations/migrations.go @@ -49,6 +49,9 @@ var psuBankBridgeConnectionUpdatedAt string //go:embed 22-rename-bank-bridges-open-banking.sql var renameBankBridgesOpenBanking string +//go:embed 24-dynamic-pools.sql +var dynamicPools string + func registerMigrations(logger logging.Logger, migrator *migrations.Migrator, encryptionKey string) { migrator.RegisterMigrations( migrations.Migration{ @@ -348,6 +351,17 @@ func registerMigrations(logger logging.Logger, migrator *migrations.Migrator, en return err }, }, + migrations.Migration{ + Name: "add dynamic pools", + Up: func(ctx context.Context, db bun.IDB) error { + return db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error { + logger.Info("running add dynamic pools migration...") + _, err := tx.ExecContext(ctx, dynamicPools) + logger.WithField("error", err).Info("finished running add dynamic pools migration") + return err + }) + }, + }, ) } diff --git a/internal/storage/pools.go b/internal/storage/pools.go index 04e01d9dd..385b1e7de 100644 --- a/internal/storage/pools.go +++ b/internal/storage/pools.go @@ -17,9 +17,11 @@ type pool struct { bun.BaseModel `bun:"table:pools"` // Mandatory fields - ID uuid.UUID `bun:"id,pk,type:uuid,notnull"` - Name string `bun:"name,type:text,notnull"` - CreatedAt time.Time `bun:"created_at,type:timestamp without time zone,notnull"` + ID uuid.UUID `bun:"id,pk,type:uuid,notnull"` + Name string `bun:"name,type:text,notnull"` + CreatedAt time.Time `bun:"created_at,type:timestamp without time zone,notnull"` + Type models.PoolType `bun:"type,type:text,notnull"` + Query map[string]any `bun:"query,type:jsonb,nullzero"` PoolAccounts []*poolAccounts `bun:"rel:has-many,join:id=pool_id"` } @@ -64,12 +66,14 @@ func (s *store) PoolsUpsert(ctx context.Context, pool models.Pool) error { return e("insert pool: %w", err) } - _, err = tx.NewInsert(). - Model(&accountsToInsert). - On("CONFLICT (pool_id, account_id) DO NOTHING"). - Exec(ctx) - if err != nil { - return e("insert pool accounts: %w", err) + if len(accountsToInsert) > 0 { + _, err = tx.NewInsert(). + Model(&accountsToInsert). + On("CONFLICT (pool_id, account_id) DO NOTHING"). + Exec(ctx) + if err != nil { + return e("insert pool accounts: %w", err) + } } return e("commit transaction: %w", tx.Commit()) @@ -271,6 +275,8 @@ func fromPoolModel(from models.Pool) (pool, []poolAccounts) { ID: from.ID, Name: from.Name, CreatedAt: time.New(from.CreatedAt), + Type: from.Type, + Query: from.Query, } var accounts []poolAccounts @@ -295,6 +301,8 @@ func toPoolModel(from pool) models.Pool { ID: from.ID, Name: from.Name, CreatedAt: from.CreatedAt.Time, + Type: from.Type, + Query: from.Query, PoolAccounts: accounts, } } diff --git a/internal/storage/pools_test.go b/internal/storage/pools_test.go index 730c0e951..4a861c97c 100644 --- a/internal/storage/pools_test.go +++ b/internal/storage/pools_test.go @@ -28,19 +28,26 @@ func defaultPools() []models.Pool { ID: poolID1, Name: "test1", CreatedAt: now.Add(-60 * time.Minute).UTC().Time, + Type: models.POOL_TYPE_STATIC, PoolAccounts: []models.AccountID{defaultAccounts[0].ID, defaultAccounts[1].ID}, }, { ID: poolID2, Name: "test2", CreatedAt: now.Add(-30 * time.Minute).UTC().Time, + Type: models.POOL_TYPE_STATIC, PoolAccounts: []models.AccountID{defaultAccounts[2].ID}, }, { - ID: poolID3, - Name: "test3", - CreatedAt: now.Add(-55 * time.Minute).UTC().Time, - PoolAccounts: []models.AccountID{defaultAccounts[2].ID}, + ID: poolID3, + Name: "test3", + CreatedAt: now.Add(-55 * time.Minute).UTC().Time, + Type: models.POOL_TYPE_DYNAMIC, + Query: map[string]any{ + "$match": map[string]any{ + "account_id": "test3", + }, + }, }, } } @@ -66,6 +73,7 @@ func TestPoolsUpsert(t *testing.T) { p := models.Pool{ ID: poolID3, Name: "test1", + Type: models.POOL_TYPE_STATIC, CreatedAt: now.Add(-30 * time.Minute).UTC().Time, PoolAccounts: []models.AccountID{defaultAccounts()[2].ID}, } @@ -390,11 +398,11 @@ func TestPoolsList(t *testing.T) { cursor, err := store.PoolsList(ctx, q) require.NoError(t, err) - require.Len(t, cursor.Data, 2) + require.Len(t, cursor.Data, 1) require.False(t, cursor.HasMore) require.Empty(t, cursor.Previous) require.Empty(t, cursor.Next) - require.Equal(t, []models.Pool{defaultPools()[1], defaultPools()[2]}, cursor.Data) + require.Equal(t, []models.Pool{defaultPools()[1]}, cursor.Data) }) t.Run("list pools by unknown account id", func(t *testing.T) { diff --git a/openapi.yaml b/openapi.yaml index 499fbd1c4..44233208c 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -3440,12 +3440,18 @@ components: required: - id - name + - type - accounts properties: id: type: string name: type: string + type: + type: string + query: + type: object + additionalProperties: true accounts: type: array items: @@ -3783,10 +3789,12 @@ components: type: object required: - name - - accountIDs properties: name: type: string + query: + type: object + additionalProperties: true accountIDs: type: array items: @@ -5502,10 +5510,12 @@ components: type: object required: - name - - accountIDs properties: name: type: string + query: + type: object + additionalProperties: true accountIDs: type: array items: @@ -5568,6 +5578,7 @@ components: - id - name - createdAt + - type - poolAccounts properties: id: @@ -5577,6 +5588,11 @@ components: createdAt: type: string format: date-time + type: + type: string + query: + type: object + additionalProperties: true poolAccounts: type: array items: diff --git a/openapi/v1-2/v1-2.yaml b/openapi/v1-2/v1-2.yaml index 958dde746..214abe67b 100644 --- a/openapi/v1-2/v1-2.yaml +++ b/openapi/v1-2/v1-2.yaml @@ -2088,12 +2088,18 @@ components: required: - id - name + - type - accounts properties: id: type: string name: type: string + type: + type: string + query: + type: object + additionalProperties: true accounts: type: array items: @@ -2431,10 +2437,12 @@ components: type: object required: - name - - accountIDs properties: name: type: string + query: + type: object + additionalProperties: true accountIDs: type: array items: diff --git a/openapi/v3/v3-schemas.yaml b/openapi/v3/v3-schemas.yaml index 69b4310fe..49f79047a 100644 --- a/openapi/v3/v3-schemas.yaml +++ b/openapi/v3/v3-schemas.yaml @@ -1569,10 +1569,12 @@ components: type: object required: - name - - accountIDs properties: name: type: string + query: + type: object + additionalProperties: true accountIDs: type: array items: @@ -1640,6 +1642,7 @@ components: - id - name - createdAt + - type - poolAccounts properties: id: @@ -1649,6 +1652,11 @@ components: createdAt: type: string format: date-time + type: + type: string + query: + type: object + additionalProperties: true poolAccounts: type: array items: diff --git a/pkg/client/.speakeasy/gen.lock b/pkg/client/.speakeasy/gen.lock index c37b86b70..c6b0b6bf7 100644 --- a/pkg/client/.speakeasy/gen.lock +++ b/pkg/client/.speakeasy/gen.lock @@ -1,7 +1,7 @@ lockVersion: 2.0.0 id: 1fa8a26f-45d9-44b7-8b97-fbeebcdcd8b1 management: - docChecksum: 7500724fcec6eeb19f88f8804752f23d + docChecksum: 9cf724eb1c77c99eb8fe9281b7a5acb2 docVersion: v1 speakeasyVersion: 1.525.0 generationVersion: 2.562.2 @@ -910,7 +910,7 @@ examples: sort: ["date:asc", "status:desc"] responses: "200": - application/json: {"cursor": {"pageSize": 15, "hasMore": false, "previous": "YXVsdCBhbmQgYSBtYXhpbXVtIG1heF9yZXN1bHRzLol=", "next": "", "data": [{"id": "", "name": "", "accounts": ["", ""]}, {"id": "", "name": "", "accounts": [""]}]}} + application/json: {"cursor": {"pageSize": 15, "hasMore": false, "previous": "YXVsdCBhbmQgYSBtYXhpbXVtIG1heF9yZXN1bHRzLol=", "next": "", "data": [{"id": "", "name": "", "type": "", "accounts": ["", ""]}, {"id": "", "name": "", "type": "", "accounts": [""]}]}} default: application/json: {"errorCode": "VALIDATION", "errorMessage": "[VALIDATION] missing reference"} createPool: @@ -919,7 +919,7 @@ examples: application/json: {"name": "", "accountIDs": ["", "", ""]} responses: "200": - application/json: {"data": {"id": "", "name": "", "accounts": []}} + application/json: {"data": {"id": "", "name": "", "type": "", "accounts": []}} default: application/json: {"errorCode": "VALIDATION", "errorMessage": "[VALIDATION] missing reference"} getPool: @@ -929,7 +929,7 @@ examples: poolId: "XXX" responses: "200": - application/json: {"data": {"id": "", "name": "", "accounts": []}} + application/json: {"data": {"id": "", "name": "", "type": "", "accounts": []}} default: application/json: {"errorCode": "VALIDATION", "errorMessage": "[VALIDATION] missing reference"} deletePool: @@ -1626,7 +1626,7 @@ examples: cursor: "aHR0cHM6Ly9nLnBhZ2UvTmVrby1SYW1lbj9zaGFyZQ==" responses: "200": - application/json: {"cursor": {"pageSize": 15, "hasMore": false, "previous": "YXVsdCBhbmQgYSBtYXhpbXVtIG1heF9yZXN1bHRzLol=", "next": "", "data": [{"id": "", "name": "", "createdAt": "2025-02-08T15:23:10.325Z", "poolAccounts": []}, {"id": "", "name": "", "createdAt": "2024-04-12T20:16:24.210Z", "poolAccounts": [""]}]}} + application/json: {"cursor": {"pageSize": 15, "hasMore": false, "previous": "YXVsdCBhbmQgYSBtYXhpbXVtIG1heF9yZXN1bHRzLol=", "next": "", "data": [{"id": "", "name": "", "createdAt": "2025-02-08T15:23:10.325Z", "type": "", "poolAccounts": []}, {"id": "", "name": "", "createdAt": "2024-04-12T20:16:24.210Z", "type": "", "poolAccounts": [""]}]}} default: application/json: {"errorCode": "VALIDATION", "errorMessage": "[VALIDATION] missing required config field: pollingPeriod"} v3GetPool: @@ -1636,7 +1636,7 @@ examples: poolID: "" responses: "200": - application/json: {"data": {"id": "", "name": "", "createdAt": "2024-04-23T06:52:21.825Z", "poolAccounts": [""]}} + application/json: {"data": {"id": "", "name": "", "createdAt": "2024-04-23T06:52:21.825Z", "type": "", "poolAccounts": [""]}} default: application/json: {"errorCode": "VALIDATION", "errorMessage": "[VALIDATION] missing required config field: pollingPeriod"} v3DeletePool: diff --git a/pkg/client/docs/models/components/pool.md b/pkg/client/docs/models/components/pool.md index 64e0433a6..341c73049 100644 --- a/pkg/client/docs/models/components/pool.md +++ b/pkg/client/docs/models/components/pool.md @@ -7,4 +7,6 @@ | ------------------ | ------------------ | ------------------ | ------------------ | | `ID` | *string* | :heavy_check_mark: | N/A | | `Name` | *string* | :heavy_check_mark: | N/A | +| `Type` | *string* | :heavy_check_mark: | N/A | +| `Query` | map[string]*any* | :heavy_minus_sign: | N/A | | `Accounts` | []*string* | :heavy_check_mark: | N/A | \ No newline at end of file diff --git a/pkg/client/docs/models/components/poolrequest.md b/pkg/client/docs/models/components/poolrequest.md index 126027315..b86b72e29 100644 --- a/pkg/client/docs/models/components/poolrequest.md +++ b/pkg/client/docs/models/components/poolrequest.md @@ -6,4 +6,5 @@ | Field | Type | Required | Description | | ------------------ | ------------------ | ------------------ | ------------------ | | `Name` | *string* | :heavy_check_mark: | N/A | -| `AccountIDs` | []*string* | :heavy_check_mark: | N/A | \ No newline at end of file +| `Query` | map[string]*any* | :heavy_minus_sign: | N/A | +| `AccountIDs` | []*string* | :heavy_minus_sign: | N/A | \ No newline at end of file diff --git a/pkg/client/docs/models/components/v3createpoolrequest.md b/pkg/client/docs/models/components/v3createpoolrequest.md index fa4d7b9a1..76a990d68 100644 --- a/pkg/client/docs/models/components/v3createpoolrequest.md +++ b/pkg/client/docs/models/components/v3createpoolrequest.md @@ -6,4 +6,5 @@ | Field | Type | Required | Description | | ------------------ | ------------------ | ------------------ | ------------------ | | `Name` | *string* | :heavy_check_mark: | N/A | -| `AccountIDs` | []*string* | :heavy_check_mark: | N/A | \ No newline at end of file +| `Query` | map[string]*any* | :heavy_minus_sign: | N/A | +| `AccountIDs` | []*string* | :heavy_minus_sign: | N/A | \ No newline at end of file diff --git a/pkg/client/docs/models/components/v3pool.md b/pkg/client/docs/models/components/v3pool.md index 840204a6e..58b903463 100644 --- a/pkg/client/docs/models/components/v3pool.md +++ b/pkg/client/docs/models/components/v3pool.md @@ -8,4 +8,6 @@ | `ID` | *string* | :heavy_check_mark: | N/A | | `Name` | *string* | :heavy_check_mark: | N/A | | `CreatedAt` | [time.Time](https://pkg.go.dev/time#Time) | :heavy_check_mark: | N/A | +| `Type` | *string* | :heavy_check_mark: | N/A | +| `Query` | map[string]*any* | :heavy_minus_sign: | N/A | | `PoolAccounts` | []*string* | :heavy_check_mark: | N/A | \ No newline at end of file diff --git a/pkg/client/models/components/pool.go b/pkg/client/models/components/pool.go index 0f6a588de..a4924da4e 100644 --- a/pkg/client/models/components/pool.go +++ b/pkg/client/models/components/pool.go @@ -3,9 +3,11 @@ package components type Pool struct { - ID string `json:"id"` - Name string `json:"name"` - Accounts []string `json:"accounts"` + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + Query map[string]any `json:"query,omitempty"` + Accounts []string `json:"accounts"` } func (o *Pool) GetID() string { @@ -22,6 +24,20 @@ func (o *Pool) GetName() string { return o.Name } +func (o *Pool) GetType() string { + if o == nil { + return "" + } + return o.Type +} + +func (o *Pool) GetQuery() map[string]any { + if o == nil { + return nil + } + return o.Query +} + func (o *Pool) GetAccounts() []string { if o == nil { return []string{} diff --git a/pkg/client/models/components/poolrequest.go b/pkg/client/models/components/poolrequest.go index 1edb8f95a..d4bff40a5 100644 --- a/pkg/client/models/components/poolrequest.go +++ b/pkg/client/models/components/poolrequest.go @@ -3,8 +3,9 @@ package components type PoolRequest struct { - Name string `json:"name"` - AccountIDs []string `json:"accountIDs"` + Name string `json:"name"` + Query map[string]any `json:"query,omitempty"` + AccountIDs []string `json:"accountIDs,omitempty"` } func (o *PoolRequest) GetName() string { @@ -14,9 +15,16 @@ func (o *PoolRequest) GetName() string { return o.Name } +func (o *PoolRequest) GetQuery() map[string]any { + if o == nil { + return nil + } + return o.Query +} + func (o *PoolRequest) GetAccountIDs() []string { if o == nil { - return []string{} + return nil } return o.AccountIDs } diff --git a/pkg/client/models/components/v3createpoolrequest.go b/pkg/client/models/components/v3createpoolrequest.go index a3974caa1..1a96f47fb 100644 --- a/pkg/client/models/components/v3createpoolrequest.go +++ b/pkg/client/models/components/v3createpoolrequest.go @@ -3,8 +3,9 @@ package components type V3CreatePoolRequest struct { - Name string `json:"name"` - AccountIDs []string `json:"accountIDs"` + Name string `json:"name"` + Query map[string]any `json:"query,omitempty"` + AccountIDs []string `json:"accountIDs,omitempty"` } func (o *V3CreatePoolRequest) GetName() string { @@ -14,9 +15,16 @@ func (o *V3CreatePoolRequest) GetName() string { return o.Name } +func (o *V3CreatePoolRequest) GetQuery() map[string]any { + if o == nil { + return nil + } + return o.Query +} + func (o *V3CreatePoolRequest) GetAccountIDs() []string { if o == nil { - return []string{} + return nil } return o.AccountIDs } diff --git a/pkg/client/models/components/v3pool.go b/pkg/client/models/components/v3pool.go index 4d7344ec0..6badb611e 100644 --- a/pkg/client/models/components/v3pool.go +++ b/pkg/client/models/components/v3pool.go @@ -8,10 +8,12 @@ import ( ) type V3Pool struct { - ID string `json:"id"` - Name string `json:"name"` - CreatedAt time.Time `json:"createdAt"` - PoolAccounts []string `json:"poolAccounts"` + ID string `json:"id"` + Name string `json:"name"` + CreatedAt time.Time `json:"createdAt"` + Type string `json:"type"` + Query map[string]any `json:"query,omitempty"` + PoolAccounts []string `json:"poolAccounts"` } func (v V3Pool) MarshalJSON() ([]byte, error) { @@ -46,6 +48,20 @@ func (o *V3Pool) GetCreatedAt() time.Time { return o.CreatedAt } +func (o *V3Pool) GetType() string { + if o == nil { + return "" + } + return o.Type +} + +func (o *V3Pool) GetQuery() map[string]any { + if o == nil { + return nil + } + return o.Query +} + func (o *V3Pool) GetPoolAccounts() []string { if o == nil { return []string{} diff --git a/test/e2e/api_pools_test.go b/test/e2e/api_pools_test.go index 9bbd0f28d..5f2ea366c 100644 --- a/test/e2e/api_pools_test.go +++ b/test/e2e/api_pools_test.go @@ -80,6 +80,28 @@ var _ = Context("Payments API Pools", Serial, func() { Expect(getResponse.GetV3GetPoolResponse().Data.PoolAccounts).To(HaveLen(len(accountIDs))) }) + It("should be ok with a query", func() { + accountIDs := setupV3PoolAccounts(ctx, app.GetValue(), e, connectorID, 5) + createResponse, err := app.GetValue().SDK().Payments.V3.CreatePool(ctx, &components.V3CreatePoolRequest{ + Name: "some-pool", + Query: map[string]any{ + "$match": map[string]any{ + "id": accountIDs[0], + }, + }, + }) + Expect(err).To(BeNil()) + + poolID := createResponse.GetV3CreatePoolResponse().Data + var msg = GenericEventPayload{ID: poolID} + Eventually(e).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(msg)))) + + getResponse, err := app.GetValue().SDK().Payments.V3.GetPool(ctx, poolID) + Expect(err).To(BeNil()) + Expect(getResponse.GetV3GetPoolResponse().Data.PoolAccounts).To(HaveLen(1)) + Expect(getResponse.GetV3GetPoolResponse().Data.PoolAccounts[0]).To(Equal(accountIDs[0])) + }) + It("should fail when underlying accounts don't exist", func() { accountID := models.AccountID{ Reference: "v3blahblahblah", @@ -151,6 +173,28 @@ var _ = Context("Payments API Pools", Serial, func() { Expect(getResponse.GetPoolResponse().Data.Accounts).To(HaveLen(len(accountIDs))) }) + It("should be ok when underlying accounts exist with a query", func() { + accountIDs := setupV2PoolAccounts(ctx, app.GetValue(), e, connectorID, 5) + createResponse, err := app.GetValue().SDK().Payments.V1.CreatePool(ctx, components.PoolRequest{ + Name: "some-pool", + Query: map[string]any{ + "$match": map[string]any{ + "id": accountIDs[0], + }, + }, + }) + Expect(err).To(BeNil()) + + poolID := createResponse.GetPoolResponse().Data.ID + var msg = GenericEventPayload{ID: poolID} + Eventually(e).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(msg)))) + + getResponse, err := app.GetValue().SDK().Payments.V1.GetPool(ctx, poolID) + Expect(err).To(BeNil()) + Expect(getResponse.GetPoolResponse().Data.Accounts).To(HaveLen(1)) + Expect(getResponse.GetPoolResponse().Data.Accounts[0]).To(Equal(accountIDs[0])) + }) + It("should fail when underlying accounts don't exist", func() { accountID := models.AccountID{ Reference: "blahblahblah", @@ -225,7 +269,7 @@ var _ = Context("Payments API Pools", Serial, func() { It("should be possible to remove account from pool", func() { _, err := app.GetValue().SDK().Payments.V3.RemoveAccountFromPool(ctx, poolID, accountIDs[0]) Expect(err).To(BeNil()) - Eventually(e).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) + Eventually(e).WithTimeout(2 * time.Second).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) getResponse, err := app.GetValue().SDK().Payments.V3.GetPool(ctx, poolID) Expect(err).To(BeNil()) @@ -241,7 +285,7 @@ var _ = Context("Payments API Pools", Serial, func() { It("should be possible to add account to pool", func() { _, err := app.GetValue().SDK().Payments.V3.AddAccountToPool(ctx, poolID, extraAccountIDs[0]) Expect(err).To(BeNil()) - Eventually(e).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) + Eventually(e).WithTimeout(2 * time.Second).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) getResponse, err := app.GetValue().SDK().Payments.V3.GetPool(ctx, poolID) Expect(err).To(BeNil()) @@ -291,7 +335,7 @@ var _ = Context("Payments API Pools", Serial, func() { It("should be possible to remove account from pool", func() { _, err := app.GetValue().SDK().Payments.V1.RemoveAccountFromPool(ctx, poolID, accountIDs[0]) Expect(err).To(BeNil()) - Eventually(e).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) + Eventually(e).WithTimeout(2 * time.Second).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) getResponse, err := app.GetValue().SDK().Payments.V1.GetPool(ctx, poolID) Expect(err).To(BeNil()) @@ -309,7 +353,7 @@ var _ = Context("Payments API Pools", Serial, func() { AccountID: extraAccountIDs[0], }) Expect(err).To(BeNil()) - Eventually(e).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) + Eventually(e).WithTimeout(2 * time.Second).Should(Receive(Event(evts.EventTypeSavedPool, WithPayloadSubset(eventPayload)))) getResponse, err := app.GetValue().SDK().Payments.V1.GetPool(ctx, poolID) Expect(err).To(BeNil())