Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate user transactions to a table #2448

Merged
merged 12 commits into from
Dec 14, 2023
13 changes: 11 additions & 2 deletions cmd/microservice-ethereum-worker-spooler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/amm"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/spooler"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/winners"
winnersQueue "github.com/fluidity-money/fluidity-app/lib/queues/winners"
"github.com/fluidity-money/fluidity-app/lib/log"
"github.com/fluidity-money/fluidity-app/lib/queue"
"github.com/fluidity-money/fluidity-app/lib/types/applications"
Expand Down Expand Up @@ -84,7 +85,10 @@ func main() {
}

queue.GetMessages(rewardsQueue, func(message queue.Message) {
var announcements []worker.EthereumWinnerAnnouncement
var (
announcements []worker.EthereumWinnerAnnouncement
pendingWinners []spooler.PendingWinner
)

message.Decode(&announcements)

Expand All @@ -99,7 +103,11 @@ func main() {

for _, announcement := range announcements {
// write the winner into the database
spooler.InsertPendingWinners(announcement, tokenDetails)
pendingWinners_ := spooler.CreatePendingWinners(announcement, tokenDetails)
spooler.InsertPendingWinners(pendingWinners_)

// store pending winners from all announcements to send to the queue later
pendingWinners = append(pendingWinners, pendingWinners_...)

// if the win was an AMM win, add the LP winnings
if announcement.Application == commonApps.ApplicationSeawaterAmm && announcement.Decorator != nil {
Expand Down Expand Up @@ -217,6 +225,7 @@ func main() {
})
}

queue.SendMessage(winnersQueue.TopicPendingWinners, pendingWinners)
queue.SendMessage(batchedRewardsQueue, rewards)
}
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/microservice-user-transactions-aggregate/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.out
lint
docker
test
4 changes: 4 additions & 0 deletions cmd/microservice-user-transactions-aggregate/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.out
lint
docker
test
16 changes: 16 additions & 0 deletions cmd/microservice-user-transactions-aggregate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM fluidity/build-container:latest AS build

WORKDIR /usr/local/src/fluidity/cmd/microservice-user-transactions-aggregate

COPY . .
RUN make


FROM fluidity/runtime-container:latest

COPY --from=build /usr/local/src/fluidity/cmd/microservice-user-transactions-aggregate/microservice-user-transactions-aggregate.out .

ENTRYPOINT [ \
"wait-for-amqp", \
"./microservice-user-transactions-aggregate.out" \
]
4 changes: 4 additions & 0 deletions cmd/microservice-user-transactions-aggregate/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

REPO := microservice-user-transactions-aggregate

include ../../golang.mk
25 changes: 25 additions & 0 deletions cmd/microservice-user-transactions-aggregate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

# microservice-user-transactions-aggregate

Aggregates user actions and wins in Timescale to be displayed in the Fluidity webapp.

## Environment variables

| Name | Description
|------------------------------|------------------------------------------------------------------------------|
| `FLU_WORKER_ID` | Worker ID used to identify the application in logging and to the AMQP queue. |
| `FLU_DEBUG` | Toggle debug messages produced by any application using the debug logger. |
| `FLU_AMQP_QUEUE_ADDR` | AMQP queue address connected to to receive and send messages down. |
| `FLU_TIMESCALE_URI` | Database URI to use when connecting to the Timescale database. |

## Building

make build

## Testing

make test

## Docker

make docker
139 changes: 139 additions & 0 deletions cmd/microservice-user-transactions-aggregate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 Fluidity Money. All rights reserved. Use of this
// source code is governed by a GPL-style license that can be found in the
// LICENSE.md file.

package main

import (
"math"
"math/big"

user_actions "github.com/fluidity-money/fluidity-app/lib/databases/timescale/user-actions"
"github.com/fluidity-money/fluidity-app/lib/log"
queue "github.com/fluidity-money/fluidity-app/lib/queues/user-actions"
"github.com/fluidity-money/fluidity-app/lib/queues/winners"
userActionsType "github.com/fluidity-money/fluidity-app/lib/types/user-actions"
winnerTypes "github.com/fluidity-money/fluidity-app/lib/types/winners"
)

func main() {
go queue.UserActionsAll(func(userAction user_actions.UserAction) {
var (
network = userAction.Network
transactionHash = userAction.TransactionHash
application = userAction.Application
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// insert if this transaction is unseen
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)
user_actions.InsertAggregatedUserTransaction(userTransaction)
// prefer to show an application if any logs in this transaction contain one
} else if existingUserTransaction.Application == "none" && application != "none" {
existingUserTransaction.Application = application
user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})

go winners.WinnersAll(func(winner winners.Winner) {
var (
network = winner.Network
transactionHash = winner.TransactionHash
application = winner.Application
sendTransactionHash = winner.SendTransactionHash
tokenDecimals = winner.TokenDetails.TokenDecimals
winningAmountInt = winner.WinningAmount.Int
winnerAddress = winner.WinnerAddress
utility = winner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, sendTransactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
sendTransactionHash,
)
})
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

decimalsAdjusted := math.Pow10(tokenDecimals)
decimalsRat := new(big.Rat).SetFloat64(decimalsAdjusted)
winningAmount := new(big.Rat).SetInt(&winningAmountInt)
winningAmountFloat, _ := winningAmount.Quo(winningAmount, decimalsRat).Float64()


// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = winnerAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.RewardHash = transactionHash
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, sendTransactionHash)
})

// pending winners have the same behaviour as winners
winners.PendingWinners(func (pendingWinner winnerTypes.PendingWinner) {
var (
network = pendingWinner.Network
transactionHash = pendingWinner.TransactionHash.String()
application = pendingWinner.Application.String()
usdWinAmount = pendingWinner.UsdWinAmount
senderAddress = pendingWinner.SenderAddress.String()
utility = pendingWinner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
transactionHash,
)
})
}
// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

winningAmountFloat := usdWinAmount

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = senderAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.RewardHash = transactionHash
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- migrate:up

-- adapted from user_transactions_aggregate_return
CREATE TABLE aggregated_user_transactions (
token_short_name TEXT,
network network_blockchain,
time TIMESTAMP,
transaction_hash VARCHAR UNIQUE,
sender_address VARCHAR,
recipient_address VARCHAR,
amount DOUBLE PRECISION,
application VARCHAR,
winning_amount DOUBLE PRECISION,
winning_address VARCHAR,
reward_hash VARCHAR,
type user_action,
swap_in BOOLEAN,
utility_amount DOUBLE PRECISION,
utility_name VARCHAR
);

-- migrate:down

DROP TABLE aggregated_user_transactions;
Loading
Loading