Skip to content

Commit

Permalink
Merge pull request #2452 from fluidity-money/develop-add-last-check-w…
Browse files Browse the repository at this point in the history
…orker-server-failsafe

Introduce a final check in the event of any weirdness with message du…
  • Loading branch information
af-afk authored Dec 14, 2023
2 parents 94c88d6 + 9302681 commit efef2a2
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cmd/microservice-ethereum-worker-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/fluidity-money/fluidity-app/common/ethereum/chainlink"
"github.com/fluidity-money/fluidity-app/common/ethereum/fluidity"

"github.com/fluidity-money/fluidity-app/lib/databases/postgres/failsafe"
worker_config "github.com/fluidity-money/fluidity-app/lib/databases/postgres/worker"
"github.com/fluidity-money/fluidity-app/lib/log"
"github.com/fluidity-money/fluidity-app/lib/queue"
Expand Down Expand Up @@ -489,6 +490,15 @@ func main() {
fluidClients = baseFluidClients
)

if logIndex == nil {
log.Fatal(func(k *log.Log) {
k.Format(
"Log index for transaction hash %v is nil!",
transactionHash,
)
})
}

// if the amount transferred was exactly 0, then we skip to the next transfer

if isDecoratedTransferZeroVolume(transfer) {
Expand Down Expand Up @@ -678,11 +688,17 @@ func main() {
secondsSinceLastEpoch,
emission,
)

payouts = append(payouts, specialPayout)
}

tokenDetails := fluidTokenDetails

// check if we've processed this before as a final failsafe before we submit
// the balls via a message and store an emission

failsafe.CommitTransactionHashIndex(transactionHash, *logIndex)

for _, payoutDetails := range payouts {

log.Debug(func(k *log.Log) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

-- migrate:up

CREATE TABLE failsafe_transaction_hash (
transaction_hash VARCHAR,
log_index BIGINT,
worker_id VARCHAR,
PRIMARY KEY(transaction_hash, log_index, worker_id)
);

-- migrate:down

DROP TABLE failsafe_transaction_hash;
13 changes: 13 additions & 0 deletions lib/databases/postgres/failsafe/failsafe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2022 Fluidity Money. All rights reserved. Use of this
// source code is governed by a GPL-style license that can be found in the
// LICENSE.md file.

package failsafe

// Context to use for when failsafe acquisition goes wrong
const Context = "POSTGRES/FAILSAFE"

// TableFailsafeTransactionHashIndex to be used as the final check before
// a side effectful action where duplication could possibly happen at the
// infra level
const TableFailsafeTransactionHashIndex = "failsafe_transaction_hash_index"
57 changes: 57 additions & 0 deletions lib/databases/postgres/failsafe/transaction_hash_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 Fluidity Money. All rights reserved. Use of this
// source code is governed by a GPL-style license that can be found in the
// LICENSE.md file.

package failsafe

import (
"fmt"

"github.com/fluidity-money/fluidity-app/lib/log"
"github.com/fluidity-money/fluidity-app/lib/postgres"
"github.com/fluidity-money/fluidity-app/lib/types/ethereum"
"github.com/fluidity-money/fluidity-app/lib/types/misc"
"github.com/fluidity-money/fluidity-app/lib/util"
)

// CommitTransactionHashIndex using the transactionHash given, the
// logIndex, and the worker ID, forming a composite primary key
// that guarantees uniqueness. Will Fatal if the insertion fails, with a
// reason. Useful for identifying duplication-related issues.
func CommitTransactionHashIndex(transactionHash ethereum.Hash, logIndex misc.BigInt) {
postgresClient := postgres.Client()

statementText := fmt.Sprintf(`
INSERT INTO %v (
transaction_hash,
log_index,
worker_id
)
VALUES (
$1,
$2,
$3
)`,

TableFailsafeTransactionHashIndex,
)

workerId := util.GetWorkerId()

_, err := postgresClient.Exec(statementText, transactionHash, logIndex, workerId)

if err != nil {
log.Fatal(func(k *log.Log) {
k.Context = Context

k.Format(
"Failed to acquire a failsafe for transaction hash %v, log index %v, worker id %v",
transactionHash,
logIndex,
workerId,
)

k.Payload = err
})
}
}

0 comments on commit efef2a2

Please sign in to comment.