Skip to content

Commit

Permalink
Merge pull request #2457 from fluidity-money/develop-aggregate-pendin…
Browse files Browse the repository at this point in the history
…g-winners

allow user transaction aggregation to begin with a pending winner
  • Loading branch information
af-afk authored Dec 18, 2023
2 parents 776dc87 + 0cd8047 commit 4cdba57
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 200 deletions.
4 changes: 2 additions & 2 deletions cmd/connector-solana-amqp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func main() {
// choose the lowest block of any token
shouldBeLatestBlock := startingBlock == 0 || startingBlock > lastBlock

if shouldBeLatestBlock {
if shouldBeLatestBlock {
startingBlock = lastBlock
}
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func main() {

// processSlot to write a block to the queue if it hasn't been already
func processSlot(redisKey string, slot uint64) {
// if redisKey is set, caller is a websocket listener
// if redisKey is set, caller is a websocket listener
if redisKey != "" {
redis.WriteLastBlock(redisKey, slot)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/microservice-ethereum-worker-server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/fluidity-money/fluidity-app/lib/state"
"github.com/fluidity-money/fluidity-app/lib/types/ethereum"
"github.com/fluidity-money/fluidity-app/lib/types/misc"
"github.com/fluidity-money/fluidity-app/lib/util"
typesWorker "github.com/fluidity-money/fluidity-app/lib/types/worker"
"github.com/fluidity-money/fluidity-app/lib/util"

ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
Expand Down
6 changes: 3 additions & 3 deletions cmd/microservice-ethereum-worker-spooler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/amm"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/spooler"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/winners"
winnersQueue "github.com/fluidity-money/fluidity-app/lib/queues/winners"
"github.com/fluidity-money/fluidity-app/lib/log"
"github.com/fluidity-money/fluidity-app/lib/queue"
winnersQueue "github.com/fluidity-money/fluidity-app/lib/queues/winners"
"github.com/fluidity-money/fluidity-app/lib/types/applications"
"github.com/fluidity-money/fluidity-app/lib/types/network"
token_details "github.com/fluidity-money/fluidity-app/lib/types/token-details"
Expand Down Expand Up @@ -86,7 +86,7 @@ func main() {

queue.GetMessages(rewardsQueue, func(message queue.Message) {
var (
announcements []worker.EthereumWinnerAnnouncement
announcements []worker.EthereumWinnerAnnouncement
pendingWinners []spooler.PendingWinner
)

Expand All @@ -105,7 +105,7 @@ func main() {
// write the winner into the database
pendingWinners_ := spooler.CreatePendingWinners(announcement, tokenDetails)
spooler.InsertPendingWinners(pendingWinners_)

// store pending winners from all announcements to send to the queue later
pendingWinners = append(pendingWinners, pendingWinners_...)

Expand Down
262 changes: 137 additions & 125 deletions cmd/microservice-user-transactions-aggregate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,129 +17,141 @@ import (
)

func main() {
go queue.UserActionsAll(func(userAction user_actions.UserAction) {
var (
network = userAction.Network
transactionHash = userAction.TransactionHash
application = userAction.Application
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// insert if this transaction is unseen
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)
user_actions.InsertAggregatedUserTransaction(userTransaction)
// prefer to show an application if any logs in this transaction contain one
} else if existingUserTransaction.Application == "none" && application != "none" {
existingUserTransaction.Application = application
user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})

go winners.WinnersAll(func(winner winners.Winner) {
var (
network = winner.Network
transactionHash = winner.TransactionHash
application = winner.Application
sendTransactionHash = winner.SendTransactionHash
tokenDecimals = winner.TokenDetails.TokenDecimals
winningAmountInt = winner.WinningAmount.Int
winnerAddress = winner.WinnerAddress
utility = winner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, sendTransactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
sendTransactionHash,
)
})
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

decimalsAdjusted := math.Pow10(tokenDecimals)
decimalsRat := new(big.Rat).SetFloat64(decimalsAdjusted)
winningAmount := new(big.Rat).SetInt(&winningAmountInt)
winningAmountFloat, _ := winningAmount.Quo(winningAmount, decimalsRat).Float64()


// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = winnerAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

// a pending winner might have set other win info
// but it cannot set the reward hash
if existingUserTransaction.RewardHash == "" {
existingUserTransaction.RewardHash = transactionHash
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, sendTransactionHash)
})

// pending winners have the same behaviour as winners
winners.PendingWinners(func (pendingWinners []winnerTypes.PendingWinner) {
for _, pendingWinner := range pendingWinners {
var (
network = pendingWinner.Network
transactionHash = pendingWinner.TransactionHash.String()
application = pendingWinner.Application.String()
usdWinAmount = pendingWinner.UsdWinAmount
senderAddress = pendingWinner.SenderAddress.String()
utility = pendingWinner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
transactionHash,
)
})
}
// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

winningAmountFloat := usdWinAmount

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = senderAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})
go queue.UserActionsAll(func(userAction user_actions.UserAction) {
var (
network = userAction.Network
transactionHash = userAction.TransactionHash
application = userAction.Application
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// insert if this transaction is unseen
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)
user_actions.InsertAggregatedUserTransaction(userTransaction)
// corresponding pending win has already been seen, so set user action specific fields
} else if existingUserTransaction.RecipientAddress == "" {
// use aggregate function to scale amount to dollars
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)

existingUserTransaction.Time = userAction.Time
existingUserTransaction.RecipientAddress = userAction.RecipientAddress
existingUserTransaction.Amount = userTransaction.Amount
existingUserTransaction.Type = userAction.Type
existingUserTransaction.SwapIn = userAction.SwapIn

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)

// prefer to show an application if any logs in this transaction contain one
} else if existingUserTransaction.Application == "none" && application != "none" {
existingUserTransaction.Application = application
user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})

go winners.WinnersAll(func(winner winners.Winner) {
var (
network = winner.Network
transactionHash = winner.TransactionHash
application = winner.Application
sendTransactionHash = winner.SendTransactionHash
tokenDecimals = winner.TokenDetails.TokenDecimals
winningAmountInt = winner.WinningAmount.Int
winnerAddress = winner.WinnerAddress
utility = winner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, sendTransactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
sendTransactionHash,
)
})
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" && utility == "FLUID" {
existingUserTransaction.Application = application
}

decimalsAdjusted := math.Pow10(tokenDecimals)
decimalsRat := new(big.Rat).SetFloat64(decimalsAdjusted)
winningAmount := new(big.Rat).SetInt(&winningAmountInt)
winningAmountFloat, _ := winningAmount.Quo(winningAmount, decimalsRat).Float64()

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = winnerAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

// a pending winner might have set other win info
// but it cannot set the reward hash
if existingUserTransaction.RewardHash == "" {
existingUserTransaction.RewardHash = transactionHash
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, sendTransactionHash)
})

// pending winners have the same behaviour as winners
winners.PendingWinners(func(pendingWinners []winnerTypes.PendingWinner) {
for _, pendingWinner := range pendingWinners {
var (
network = pendingWinner.Network
transactionHash = pendingWinner.TransactionHash.String()
application = pendingWinner.Application.String()
usdWinAmount = pendingWinner.UsdWinAmount
senderAddress = pendingWinner.SenderAddress.String()
utility = pendingWinner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// corresponding user action has not yet been tracked, so create the row
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromPendingWinner(pendingWinner)
user_actions.InsertAggregatedUserTransaction(userTransaction)
return
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" && utility == "FLUID" {
existingUserTransaction.Application = application
}

winningAmountFloat := usdWinAmount

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = senderAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})
}
2 changes: 1 addition & 1 deletion common/ethereum/applications/sushiswap/sushiswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func GetSushiswapStableOrConstantProductFees(transfer worker.EthereumApplication

var (
decimals_ []interface{}
decimals *big.Rat
decimals *big.Rat
)

// try Stable then ConstantProduct
Expand Down
10 changes: 5 additions & 5 deletions common/ethereum/applications/trader-joe/trader-joe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

const (
// Trader Joe fees are always e18
FeeDecimals = 18
FeeDecimals = 18
traderJoeSwapLogTopic = "0xad7d6f97abf51ce18e17a38f4d70e975be9c0708474987bb3e26ad21bd93ca70"
)

Expand Down Expand Up @@ -114,7 +114,7 @@ const traderJoeSwapAbiString = `[
`

var (
traderJoeSwapAbi ethAbi.ABI
traderJoeSwapAbi ethAbi.ABI
traderJoeLBPairAbi ethAbi.ABI
)

Expand Down Expand Up @@ -190,7 +190,7 @@ func GetTraderJoeFees(transfer worker.EthereumApplicationTransfer, client *ethcl
}

var (
zero = big.NewInt(0)
zero = big.NewInt(0)
swapContainsFluid = fluidTokenContract == tokenXaddr || fluidTokenContract == tokenYaddr
)

Expand Down Expand Up @@ -239,7 +239,7 @@ func GetTraderJoeFees(transfer worker.EthereumApplicationTransfer, client *ethcl
// X was the in token, so set it as the transfer amount
fluidTransferAmount.SetInt(amountInX)
break
}
}

// X was not the in token, so get the fluid volume from amountsOut
amountOutX := new(big.Int).SetBytes(amountsOut[16:])
Expand All @@ -254,7 +254,7 @@ func GetTraderJoeFees(transfer worker.EthereumApplicationTransfer, client *ethcl
// Y was the in token, so set it as the transfer amount
fluidTransferAmount.SetInt(amountInY)
break
}
}

// Y was not the in token, so get the fluid volume from amountsOut
amountOutY := new(big.Int).SetBytes(amountsOut[:16])
Expand Down
Loading

0 comments on commit 4cdba57

Please sign in to comment.