Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate user transactions to a table #2448

Merged
merged 12 commits into from
Dec 14, 2023
7 changes: 5 additions & 2 deletions cmd/microservice-ethereum-worker-spooler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/amm"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/spooler"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/winners"
winnersQueue "github.com/fluidity-money/fluidity-app/lib/queues/winners"
"github.com/fluidity-money/fluidity-app/lib/log"
"github.com/fluidity-money/fluidity-app/lib/queue"
"github.com/fluidity-money/fluidity-app/lib/types/applications"
Expand Down Expand Up @@ -98,8 +99,10 @@ func main() {
toSend := make(map[token_details.TokenDetails]bool)

for _, announcement := range announcements {
// write the winner into the database
spooler.InsertPendingWinners(announcement, tokenDetails)
// write the winner into the database and to the pending winners queue
pendingWinners := spooler.CreatePendingWinners(announcement, tokenDetails)
eli-d marked this conversation as resolved.
Show resolved Hide resolved
spooler.InsertPendingWinners(pendingWinners)
queue.SendMessage(winnersQueue.TopicPendingWinners)
eli-d marked this conversation as resolved.
Show resolved Hide resolved

// if the win was an AMM win, add the LP winnings
if announcement.Application == commonApps.ApplicationSeawaterAmm && announcement.Decorator != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/microservice-user-transactions-aggregate/.dockerignore
4 changes: 4 additions & 0 deletions cmd/microservice-user-transactions-aggregate/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.out
lint
docker
test
16 changes: 16 additions & 0 deletions cmd/microservice-user-transactions-aggregate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM fluidity/build-container:latest AS build

WORKDIR /usr/local/src/fluidity/cmd/microservice-user-transactions-aggregate

COPY . .
RUN make


FROM fluidity/runtime-container:latest

COPY --from=build /usr/local/src/fluidity/cmd/microservice-user-transactions-aggregate/microservice-user-transactions-aggregate.out .

ENTRYPOINT [ \
"wait-for-amqp", \
"./microservice-user-transactions-aggregate.out" \
]
4 changes: 4 additions & 0 deletions cmd/microservice-user-transactions-aggregate/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

REPO := microservice-user-transactions-aggregate

include ../../golang.mk
25 changes: 25 additions & 0 deletions cmd/microservice-user-transactions-aggregate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

# microservice-user-transactions-aggregate

Aggregates user actions and wins to be displayed in the Fluidity webapp.
eli-d marked this conversation as resolved.
Show resolved Hide resolved

## Environment variables

| Name | Description
|------------------------------|------------------------------------------------------------------------------|
| `FLU_WORKER_ID` | Worker ID used to identify the application in logging and to the AMQP queue. |
| `FLU_DEBUG` | Toggle debug messages produced by any application using the debug logger. |
| `FLU_AMQP_QUEUE_ADDR` | AMQP queue address connected to to receive and send messages down. |
| `FLU_TIMESCALE_URI` | Database URI to use when connecting to the Timescale database. |

## Building

make build

## Testing

make test

## Docker

make docker
139 changes: 139 additions & 0 deletions cmd/microservice-user-transactions-aggregate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 Fluidity Money. All rights reserved. Use of this
// source code is governed by a GPL-style license that can be found in the
// LICENSE.md file.

package main

import (
"math"
"math/big"

"github.com/fluidity-money/fluidity-app/lib/databases/timescale/spooler"
user_actions "github.com/fluidity-money/fluidity-app/lib/databases/timescale/user-actions"
"github.com/fluidity-money/fluidity-app/lib/log"
queue "github.com/fluidity-money/fluidity-app/lib/queues/user-actions"
"github.com/fluidity-money/fluidity-app/lib/queues/winners"
userActionsType "github.com/fluidity-money/fluidity-app/lib/types/user-actions"
)

func main() {
go queue.UserActionsAll(func(userAction user_actions.UserAction) {
var (
network = userAction.Network
transactionHash = userAction.TransactionHash
application = userAction.Application
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// insert if this transaction is unseen
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)
user_actions.InsertAggregatedUserTransaction(userTransaction)
// prefer to show an application if any logs in this transaction contain one
} else if existingUserTransaction.Application == "none" && application != "none" {
existingUserTransaction.Application = application
user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})

go winners.WinnersAll(func(winner winners.Winner) {
var (
network = winner.Network
transactionHash = winner.TransactionHash
application = winner.Application
sendTransactionHash = winner.SendTransactionHash
tokenDecimals = winner.TokenDetails.TokenDecimals
winningAmountInt = winner.WinningAmount.Int
winnerAddress = winner.WinnerAddress
utility = winner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, sendTransactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
sendTransactionHash,
)
})
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

decimalsAdjusted := math.Pow10(tokenDecimals)
decimalsRat := new(big.Rat).SetFloat64(decimalsAdjusted)
winningAmount := new(big.Rat).SetInt(&winningAmountInt)
winningAmountFloat, _ := winningAmount.Quo(winningAmount, decimalsRat).Float64()


// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = winnerAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.RewardHash = transactionHash
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, sendTransactionHash)
})

// pending winners have the same behaviour as winners
winners.PendingWinners(func (pendingWinner spooler.PendingWinner) {
var (
network = pendingWinner.Network
transactionHash = pendingWinner.TransactionHash.String()
application = pendingWinner.Application.String()
usdWinAmount = pendingWinner.UsdWinAmount
senderAddress = pendingWinner.SenderAddress.String()
utility = pendingWinner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
transactionHash,
)
})
}
// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

winningAmountFloat := usdWinAmount

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = senderAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.RewardHash = transactionHash
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- migrate:up

-- adapted from user_transactions_aggregate_return
CREATE TABLE aggregated_user_transactions (
token_short_name TEXT,
network network_blockchain,
time BIGINT,
transaction_hash VARCHAR,
sender_address VARCHAR,
recipient_address VARCHAR,
amount DOUBLE PRECISION,
application VARCHAR,
winning_amount DOUBLE PRECISION,
winning_address VARCHAR,
reward_hash VARCHAR,
type user_action,
swap_in,
utility_amount DOUBLE PRECISION,
utility_name VARCHAR
);

-- migrate:down

DROP TABLE aggregated_user_transactions;
Loading
Loading