Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3130,6 +3130,7 @@ Accept: application/json
```json
{
"name": "string",
"query": {},
"accountIDs": [
"string"
]
Expand Down Expand Up @@ -3214,6 +3215,8 @@ Accept: application/json
"id": "string",
"name": "string",
"createdAt": "2019-08-24T14:15:22Z",
"type": "string",
"query": {},
"poolAccounts": [
"string"
]
Expand Down Expand Up @@ -3266,6 +3269,8 @@ Accept: application/json
"id": "string",
"name": "string",
"createdAt": "2019-08-24T14:15:22Z",
"type": "string",
"query": {},
"poolAccounts": [
"string"
]
Expand Down Expand Up @@ -6335,6 +6340,7 @@ None ( Scopes: payments:read )
```json
{
"name": "string",
"query": {},
"accountIDs": [
"string"
]
Expand All @@ -6347,7 +6353,8 @@ None ( Scopes: payments:read )
|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|name|string|true|none|none|
|accountIDs|[string]|true|none|none|
|query|object|false|none|none|
|accountIDs|[string]|false|none|none|

<h2 id="tocS_V3CreatePoolResponse">V3CreatePoolResponse</h2>
<!-- backwards compatibility -->
Expand Down Expand Up @@ -6388,6 +6395,8 @@ None ( Scopes: payments:read )
"id": "string",
"name": "string",
"createdAt": "2019-08-24T14:15:22Z",
"type": "string",
"query": {},
"poolAccounts": [
"string"
]
Expand Down Expand Up @@ -6422,6 +6431,8 @@ None ( Scopes: payments:read )
"id": "string",
"name": "string",
"createdAt": "2019-08-24T14:15:22Z",
"type": "string",
"query": {},
"poolAccounts": [
"string"
]
Expand Down Expand Up @@ -6476,6 +6487,8 @@ None ( Scopes: payments:read )
"id": "string",
"name": "string",
"createdAt": "2019-08-24T14:15:22Z",
"type": "string",
"query": {},
"poolAccounts": [
"string"
]
Expand All @@ -6490,6 +6503,8 @@ None ( Scopes: payments:read )
|id|string|true|none|none|
|name|string|true|none|none|
|createdAt|string(date-time)|true|none|none|
|type|string|true|none|none|
|query|object|false|none|none|
|poolAccounts|[[V3AccountID](#schemav3accountid)]|true|none|none|

<h2 id="tocS_V3PoolBalances">V3PoolBalances</h2>
Expand Down
9 changes: 9 additions & 0 deletions internal/api/services/pools_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ func (s *Service) PoolsBalances(
if err != nil {
return nil, newStorageError(err, "cannot get pool")
}

if pool.Type == models.POOL_TYPE_DYNAMIC {
// populate the pool accounts from the query
pool.PoolAccounts, err = s.populatePoolAccounts(ctx, pool)
if err != nil {
return nil, newStorageError(err, "cannot populate pool accounts")
}
}

res := make(map[string]*aggregatedBalance)
for i := range pool.PoolAccounts {
balances, err := s.storage.BalancesGetLatest(ctx, pool.PoolAccounts[i])
Expand Down
54 changes: 54 additions & 0 deletions internal/api/services/pools_balances_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package services

import (
"context"
"encoding/json"
"math/big"
"time"

"github.com/formancehq/go-libs/v3/bun/bunpaginate"
"github.com/formancehq/go-libs/v3/query"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
"github.com/google/uuid"
)

Expand All @@ -18,6 +22,15 @@ func (s *Service) PoolsBalancesAt(
if err != nil {
return nil, newStorageError(err, "cannot get pool")
}

if pool.Type == models.POOL_TYPE_DYNAMIC {
// populate the pool accounts from the query
pool.PoolAccounts, err = s.populatePoolAccounts(ctx, pool)
if err != nil {
return nil, newStorageError(err, "cannot populate pool accounts")
}
}

res := make(map[string]*aggregatedBalance)
for i := range pool.PoolAccounts {
balances, err := s.storage.BalancesGetAt(ctx, pool.PoolAccounts[i], at)
Expand Down Expand Up @@ -51,3 +64,44 @@ func (s *Service) PoolsBalancesAt(

return balances, nil
}

func (s *Service) populatePoolAccounts(ctx context.Context, pool *models.Pool) ([]models.AccountID, error) {
queryJSON, err := json.Marshal(pool.Query)
if err != nil {
return nil, newStorageError(err, "cannot marshal pool query")
}

qb, err := query.ParseJSON(string(queryJSON))
if err != nil {
return nil, newStorageError(err, "cannot parse pool query")
}

q := storage.NewListAccountsQuery(
bunpaginate.NewPaginatedQueryOptions(storage.AccountQuery{}).
WithPageSize(100).
WithQueryBuilder(qb),
)

res := make([]models.AccountID, 0)
for {
cursor, err := s.storage.AccountsList(ctx, q)
if err != nil {
return nil, newStorageError(err, "cannot list accounts")
}

for _, account := range cursor.Data {
res = append(res, account.ID)
}

if !cursor.HasMore {
break
}

err = bunpaginate.UnmarshalCursor(cursor.Next, &q)
if err != nil {
return nil, newStorageError(err, "cannot unmarshal cursor")
}
}

return res, nil
}
1 change: 1 addition & 0 deletions internal/api/services/pools_balances_at_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestPoolsBalancesAt(t *testing.T) {
store.EXPECT().PoolsGet(gomock.Any(), id).Return(&models.Pool{
ID: id,
Name: "test",
Type: models.POOL_TYPE_STATIC,
CreatedAt: at,
PoolAccounts: poolsAccount,
}, test.poolsGetStorageErr)
Expand Down
1 change: 1 addition & 0 deletions internal/api/services/pools_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestPoolsBalancesLatest(t *testing.T) {
ID: id,
Name: "test",
CreatedAt: time.Now().Add(-time.Hour),
Type: models.POOL_TYPE_STATIC,
PoolAccounts: poolsAccount,
}, test.poolsGetStorageErr)
if test.poolsGetStorageErr == nil {
Expand Down
8 changes: 8 additions & 0 deletions internal/api/services/pools_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ func (s *Service) PoolsGet(ctx context.Context, id uuid.UUID) (*models.Pool, err
return nil, newStorageError(err, "cannot get pool")
}

if p.Type == models.POOL_TYPE_DYNAMIC {
// populate the pool accounts from the query
p.PoolAccounts, err = s.populatePoolAccounts(ctx, p)
if err != nil {
return nil, newStorageError(err, "cannot populate pool accounts")
}
}

return p, nil
}
19 changes: 19 additions & 0 deletions internal/api/services/pools_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/formancehq/go-libs/v3/bun/bunpaginate"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
"golang.org/x/sync/errgroup"
)

func (s *Service) PoolsList(ctx context.Context, query storage.ListPoolsQuery) (*bunpaginate.Cursor[models.Pool], error) {
Expand All @@ -14,5 +15,23 @@ func (s *Service) PoolsList(ctx context.Context, query storage.ListPoolsQuery) (
return nil, newStorageError(err, "cannot list pools")
}

errGroup, egCtx := errgroup.WithContext(ctx)
for i := range ps.Data {
index := i
errGroup.Go(func() error {
if ps.Data[index].Type == models.POOL_TYPE_DYNAMIC {
ps.Data[index].PoolAccounts, err = s.populatePoolAccounts(egCtx, &ps.Data[index])
if err != nil {
return newStorageError(err, "cannot populate pool accounts")
}
}
return nil
})
}

if err := errGroup.Wait(); err != nil {
return nil, newStorageError(err, "cannot populate pool accounts")
}

return ps, nil
}
4 changes: 3 additions & 1 deletion internal/api/services/pools_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"testing"

"github.com/formancehq/go-libs/v3/bun/bunpaginate"
"github.com/formancehq/payments/internal/connectors/engine"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
Expand Down Expand Up @@ -46,7 +48,7 @@ func TestPoolsList(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
query := storage.ListPoolsQuery{}
store.EXPECT().PoolsList(gomock.Any(), query).Return(nil, test.err)
store.EXPECT().PoolsList(gomock.Any(), query).Return(&bunpaginate.Cursor[models.Pool]{}, test.err)
_, err := s.PoolsList(context.Background(), query)
if test.expectedError == nil {
require.NoError(t, err)
Expand Down
41 changes: 26 additions & 15 deletions internal/api/v2/handler_pools_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
)

type CreatePoolRequest struct {
Name string `json:"name" validate:"required"`
AccountIDs []string `json:"accountIDs" validate:"min=1,dive,accountID"`
Name string `json:"name" validate:"required"`
Query map[string]any `json:"query" validate:"required_without=AccountIDs"`
AccountIDs []string `json:"accountIDs" validate:"required_without=Query,dive,accountID"`
}

type PoolResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Accounts []string `json:"accounts"`
ID string `json:"id"`
Name string `json:"name"`
Type models.PoolType `json:"type"`
Query map[string]any `json:"query"`
Accounts []string `json:"accounts"`
}

func poolsCreate(backend backend.Backend, validator *validation.Validator) http.HandlerFunc {
Expand Down Expand Up @@ -55,18 +58,24 @@ func poolsCreate(backend backend.Backend, validator *validation.Validator) http.
CreatedAt: time.Now().UTC(),
}

accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs))
for i, accountID := range CreatePoolRequest.AccountIDs {
aID, err := models.AccountIDFromString(accountID)
if err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrValidation, err)
return
}
if len(CreatePoolRequest.Query) > 0 {
pool.Type = models.POOL_TYPE_DYNAMIC
pool.Query = CreatePoolRequest.Query
} else {
pool.Type = models.POOL_TYPE_STATIC
accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs))
for i, accountID := range CreatePoolRequest.AccountIDs {
aID, err := models.AccountIDFromString(accountID)
if err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrValidation, err)
return
}

accounts[i] = aID
accounts[i] = aID
}
pool.PoolAccounts = accounts
}
pool.PoolAccounts = accounts

err = backend.PoolsCreate(ctx, pool)
if err != nil {
Expand All @@ -78,6 +87,8 @@ func poolsCreate(backend backend.Backend, validator *validation.Validator) http.
data := &PoolResponse{
ID: pool.ID.String(),
Name: pool.Name,
Type: pool.Type,
Query: pool.Query,
Accounts: CreatePoolRequest.AccountIDs,
}

Expand Down
14 changes: 14 additions & 0 deletions internal/api/v2/handler_pools_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,19 @@ var _ = Describe("API v2 Pools Create", func() {
handlerFn(w, prepareJSONRequest(http.MethodPost, &cpr))
assertExpectedResponse(w.Result(), http.StatusOK, "data")
})

It("should return status ok on success with a query", func(ctx SpecContext) {
m.EXPECT().PoolsCreate(gomock.Any(), gomock.Any()).Return(nil)
cpr = CreatePoolRequest{
Name: "name",
Query: map[string]any{
"$match": map[string]any{
"account_id": accID.String(),
},
},
}
handlerFn(w, prepareJSONRequest(http.MethodPost, &cpr))
assertExpectedResponse(w.Result(), http.StatusOK, "data")
})
})
})
6 changes: 4 additions & 2 deletions internal/api/v2/handler_pools_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ func poolsGet(backend backend.Backend) http.HandlerFunc {
}

data := &PoolResponse{
ID: pool.ID.String(),
Name: pool.Name,
ID: pool.ID.String(),
Name: pool.Name,
Type: pool.Type,
Query: pool.Query,
}

accounts := make([]string, len(pool.PoolAccounts))
Expand Down
6 changes: 4 additions & 2 deletions internal/api/v2/handler_pools_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func poolsList(backend backend.Backend) http.HandlerFunc {
data := make([]*PoolResponse, len(cursor.Data))
for i := range cursor.Data {
data[i] = &PoolResponse{
ID: cursor.Data[i].ID.String(),
Name: cursor.Data[i].Name,
ID: cursor.Data[i].ID.String(),
Type: cursor.Data[i].Type,
Query: cursor.Data[i].Query,
Name: cursor.Data[i].Name,
}

accounts := make([]string, len(cursor.Data[i].PoolAccounts))
Expand Down
Loading
Loading