Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow user transaction aggregation to begin with a pending winner #2457

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/connector-solana-amqp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func main() {
// choose the lowest block of any token
shouldBeLatestBlock := startingBlock == 0 || startingBlock > lastBlock

if shouldBeLatestBlock {
if shouldBeLatestBlock {
startingBlock = lastBlock
}
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func main() {

// processSlot to write a block to the queue if it hasn't been already
func processSlot(redisKey string, slot uint64) {
// if redisKey is set, caller is a websocket listener
// if redisKey is set, caller is a websocket listener
if redisKey != "" {
redis.WriteLastBlock(redisKey, slot)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/microservice-ethereum-worker-server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/fluidity-money/fluidity-app/lib/state"
"github.com/fluidity-money/fluidity-app/lib/types/ethereum"
"github.com/fluidity-money/fluidity-app/lib/types/misc"
"github.com/fluidity-money/fluidity-app/lib/util"
typesWorker "github.com/fluidity-money/fluidity-app/lib/types/worker"
"github.com/fluidity-money/fluidity-app/lib/util"

ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
Expand Down
6 changes: 3 additions & 3 deletions cmd/microservice-ethereum-worker-spooler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/amm"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/spooler"
"github.com/fluidity-money/fluidity-app/lib/databases/timescale/winners"
winnersQueue "github.com/fluidity-money/fluidity-app/lib/queues/winners"
"github.com/fluidity-money/fluidity-app/lib/log"
"github.com/fluidity-money/fluidity-app/lib/queue"
winnersQueue "github.com/fluidity-money/fluidity-app/lib/queues/winners"
"github.com/fluidity-money/fluidity-app/lib/types/applications"
"github.com/fluidity-money/fluidity-app/lib/types/network"
token_details "github.com/fluidity-money/fluidity-app/lib/types/token-details"
Expand Down Expand Up @@ -86,7 +86,7 @@ func main() {

queue.GetMessages(rewardsQueue, func(message queue.Message) {
var (
announcements []worker.EthereumWinnerAnnouncement
announcements []worker.EthereumWinnerAnnouncement
pendingWinners []spooler.PendingWinner
)

Expand All @@ -105,7 +105,7 @@ func main() {
// write the winner into the database
pendingWinners_ := spooler.CreatePendingWinners(announcement, tokenDetails)
spooler.InsertPendingWinners(pendingWinners_)

// store pending winners from all announcements to send to the queue later
pendingWinners = append(pendingWinners, pendingWinners_...)

Expand Down
262 changes: 137 additions & 125 deletions cmd/microservice-user-transactions-aggregate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,129 +17,141 @@ import (
)

func main() {
go queue.UserActionsAll(func(userAction user_actions.UserAction) {
var (
network = userAction.Network
transactionHash = userAction.TransactionHash
application = userAction.Application
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// insert if this transaction is unseen
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)
user_actions.InsertAggregatedUserTransaction(userTransaction)
// prefer to show an application if any logs in this transaction contain one
} else if existingUserTransaction.Application == "none" && application != "none" {
existingUserTransaction.Application = application
user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})

go winners.WinnersAll(func(winner winners.Winner) {
var (
network = winner.Network
transactionHash = winner.TransactionHash
application = winner.Application
sendTransactionHash = winner.SendTransactionHash
tokenDecimals = winner.TokenDetails.TokenDecimals
winningAmountInt = winner.WinningAmount.Int
winnerAddress = winner.WinnerAddress
utility = winner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, sendTransactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
sendTransactionHash,
)
})
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

decimalsAdjusted := math.Pow10(tokenDecimals)
decimalsRat := new(big.Rat).SetFloat64(decimalsAdjusted)
winningAmount := new(big.Rat).SetInt(&winningAmountInt)
winningAmountFloat, _ := winningAmount.Quo(winningAmount, decimalsRat).Float64()


// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = winnerAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

// a pending winner might have set other win info
// but it cannot set the reward hash
if existingUserTransaction.RewardHash == "" {
existingUserTransaction.RewardHash = transactionHash
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, sendTransactionHash)
})

// pending winners have the same behaviour as winners
winners.PendingWinners(func (pendingWinners []winnerTypes.PendingWinner) {
for _, pendingWinner := range pendingWinners {
var (
network = pendingWinner.Network
transactionHash = pendingWinner.TransactionHash.String()
application = pendingWinner.Application.String()
usdWinAmount = pendingWinner.UsdWinAmount
senderAddress = pendingWinner.SenderAddress.String()
utility = pendingWinner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
transactionHash,
)
})
}
// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" {
existingUserTransaction.Application = application
}

winningAmountFloat := usdWinAmount

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = senderAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})
go queue.UserActionsAll(func(userAction user_actions.UserAction) {
var (
network = userAction.Network
transactionHash = userAction.TransactionHash
application = userAction.Application
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// insert if this transaction is unseen
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)
user_actions.InsertAggregatedUserTransaction(userTransaction)
// corresponding pending win has already been seen, so set user action specific fields
} else if existingUserTransaction.RecipientAddress == "" {
// use aggregate function to scale amount to dollars
userTransaction := userActionsType.AggregatedTransactionFromUserAction(userAction)

existingUserTransaction.Time = userAction.Time
existingUserTransaction.RecipientAddress = userAction.RecipientAddress
existingUserTransaction.Amount = userTransaction.Amount
existingUserTransaction.Type = userAction.Type
existingUserTransaction.SwapIn = userAction.SwapIn

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)

// prefer to show an application if any logs in this transaction contain one
} else if existingUserTransaction.Application == "none" && application != "none" {
existingUserTransaction.Application = application
user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})

go winners.WinnersAll(func(winner winners.Winner) {
var (
network = winner.Network
transactionHash = winner.TransactionHash
application = winner.Application
sendTransactionHash = winner.SendTransactionHash
tokenDecimals = winner.TokenDetails.TokenDecimals
winningAmountInt = winner.WinningAmount.Int
winnerAddress = winner.WinnerAddress
utility = winner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, sendTransactionHash)
if existingUserTransaction == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Found a winner in transaction %v with no corresponding send!",
sendTransactionHash,
)
})
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" && utility == "FLUID" {
existingUserTransaction.Application = application
}

decimalsAdjusted := math.Pow10(tokenDecimals)
decimalsRat := new(big.Rat).SetFloat64(decimalsAdjusted)
winningAmount := new(big.Rat).SetInt(&winningAmountInt)
winningAmountFloat, _ := winningAmount.Quo(winningAmount, decimalsRat).Float64()

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = winnerAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

// a pending winner might have set other win info
// but it cannot set the reward hash
if existingUserTransaction.RewardHash == "" {
existingUserTransaction.RewardHash = transactionHash
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, sendTransactionHash)
})

// pending winners have the same behaviour as winners
winners.PendingWinners(func(pendingWinners []winnerTypes.PendingWinner) {
for _, pendingWinner := range pendingWinners {
var (
network = pendingWinner.Network
transactionHash = pendingWinner.TransactionHash.String()
application = pendingWinner.Application.String()
usdWinAmount = pendingWinner.UsdWinAmount
senderAddress = pendingWinner.SenderAddress.String()
utility = pendingWinner.Utility
)

existingUserTransaction := user_actions.GetAggregatedUserTransactionByHash(network, transactionHash)

// corresponding user action has not yet been tracked, so create the row
if existingUserTransaction == nil {
userTransaction := userActionsType.AggregatedTransactionFromPendingWinner(pendingWinner)
user_actions.InsertAggregatedUserTransaction(userTransaction)
return
}

// regardless of whether there's existing win data, always prefer to show
// an application if any logs in this transaction contain one
if existingUserTransaction.Application == "none" && utility == "FLUID" {
existingUserTransaction.Application = application
}

winningAmountFloat := usdWinAmount

// no existing info, update all win-related fields
if existingUserTransaction.WinningAddress == "" && utility == "FLUID" {
existingUserTransaction.WinningAddress = senderAddress
existingUserTransaction.WinningAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

existingUtility := existingUserTransaction.UtilityName

// update utility amount and name if unset
if utility != "FLUID" && (existingUtility == "FLUID" || existingUtility == "") {
existingUserTransaction.UtilityAmount = winningAmountFloat
existingUserTransaction.UtilityName = utility
}

user_actions.UpdateAggregatedUserTransactionByHash(*existingUserTransaction, transactionHash)
}
})
}
2 changes: 1 addition & 1 deletion common/ethereum/applications/sushiswap/sushiswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func GetSushiswapStableOrConstantProductFees(transfer worker.EthereumApplication

var (
decimals_ []interface{}
decimals *big.Rat
decimals *big.Rat
)

// try Stable then ConstantProduct
Expand Down
10 changes: 5 additions & 5 deletions common/ethereum/applications/trader-joe/trader-joe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

const (
// Trader Joe fees are always e18
FeeDecimals = 18
FeeDecimals = 18
traderJoeSwapLogTopic = "0xad7d6f97abf51ce18e17a38f4d70e975be9c0708474987bb3e26ad21bd93ca70"
)

Expand Down Expand Up @@ -114,7 +114,7 @@ const traderJoeSwapAbiString = `[
`

var (
traderJoeSwapAbi ethAbi.ABI
traderJoeSwapAbi ethAbi.ABI
traderJoeLBPairAbi ethAbi.ABI
)

Expand Down Expand Up @@ -190,7 +190,7 @@ func GetTraderJoeFees(transfer worker.EthereumApplicationTransfer, client *ethcl
}

var (
zero = big.NewInt(0)
zero = big.NewInt(0)
swapContainsFluid = fluidTokenContract == tokenXaddr || fluidTokenContract == tokenYaddr
)

Expand Down Expand Up @@ -239,7 +239,7 @@ func GetTraderJoeFees(transfer worker.EthereumApplicationTransfer, client *ethcl
// X was the in token, so set it as the transfer amount
fluidTransferAmount.SetInt(amountInX)
break
}
}

// X was not the in token, so get the fluid volume from amountsOut
amountOutX := new(big.Int).SetBytes(amountsOut[16:])
Expand All @@ -254,7 +254,7 @@ func GetTraderJoeFees(transfer worker.EthereumApplicationTransfer, client *ethcl
// Y was the in token, so set it as the transfer amount
fluidTransferAmount.SetInt(amountInY)
break
}
}

// Y was not the in token, so get the fluid volume from amountsOut
amountOutY := new(big.Int).SetBytes(amountsOut[:16])
Expand Down
Loading
Loading