Skip to content

Commit

Permalink
Merge branch 'main' into CLOUD-869
Browse files Browse the repository at this point in the history
  • Loading branch information
ptankov authored Jan 14, 2025
2 parents f702ccb + 699f1f2 commit ce3a25a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
51 changes: 51 additions & 0 deletions cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,53 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/percona/percona-xtradb-cluster-operator/cmd/pitr/pxc"
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage"
)

var (
pxcBinlogCollectorBackupSuccess = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "pxc_binlog_collector_success_total",
Help: "Total number of successful binlog backups",
},
)
pxcBinlogCollectorBackupFailure = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "pxc_binlog_collector_failure_total",
Help: "Total number of failed binlog backups",
},
)
pxcBinlogCollectorLastProcessingTime = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "pxc_binlog_collector_last_processing_timestamp",
Help: "Timestamp of the last successful binlog processing",
},
)
pxcBinlogCollectorLastUploadTime = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "pxc_binlog_collector_last_upload_timestamp",
Help: "Timestamp of the last successful binlog upload",
},
)
pxcBinlogCollectorGapDetected = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "pxc_binlog_collector_gap_detected_total",
Help: "Total number of times the gap was detected in binlog",
},
)
)

func init() {
prometheus.MustRegister(pxcBinlogCollectorBackupSuccess)
prometheus.MustRegister(pxcBinlogCollectorBackupFailure)
prometheus.MustRegister(pxcBinlogCollectorLastProcessingTime)
prometheus.MustRegister(pxcBinlogCollectorLastUploadTime)
prometheus.MustRegister(pxcBinlogCollectorGapDetected)
}

type Collector struct {
db *pxc.PXC
storage storage.Storage
Expand Down Expand Up @@ -103,6 +145,7 @@ func New(ctx context.Context, c Config) (*Collector, error) {
func (c *Collector) Run(ctx context.Context) error {
err := c.newDB(ctx)
if err != nil {
pxcBinlogCollectorBackupFailure.Inc()
return errors.Wrap(err, "new db connection")
}
defer c.close()
Expand All @@ -113,9 +156,11 @@ func (c *Collector) Run(ctx context.Context) error {

err = c.CollectBinLogs(ctx)
if err != nil {
pxcBinlogCollectorBackupFailure.Inc()
return errors.Wrap(err, "collect binlog files")
}

pxcBinlogCollectorBackupSuccess.Inc()
return nil
}

Expand Down Expand Up @@ -369,6 +414,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {
if lastUploadedBinlogName == "" {
log.Println("ERROR: Couldn't find the binlog that contains GTID set:", c.lastUploadedSet.Raw())
log.Println("ERROR: Gap detected in the binary logs. Binary logs will be uploaded anyway, but full backup needed for consistent recovery.")
pxcBinlogCollectorGapDetected.Inc()
if err := createGapFile(c.lastUploadedSet); err != nil {
return errors.Wrap(err, "create gap file")
}
Expand All @@ -382,6 +428,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {

if len(list) == 0 {
log.Println("No binlogs to upload")
pxcBinlogCollectorLastProcessingTime.SetToCurrentTime()
return nil
}

Expand All @@ -402,6 +449,8 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {
return errors.Wrap(err, "manage binlog")
}

pxcBinlogCollectorLastUploadTime.SetToCurrentTime()

lastTs, err := c.db.GetBinLogLastTimestamp(ctx, binlog.Name)
if err != nil {
return errors.Wrap(err, "get last timestamp")
Expand All @@ -411,6 +460,8 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {
return errors.Wrap(err, "update timeline file")
}
}

pxcBinlogCollectorLastProcessingTime.SetToCurrentTime()
return nil
}

Expand Down
26 changes: 26 additions & 0 deletions cmd/pitr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/percona/percona-xtradb-cluster-operator/cmd/pitr/recoverer"

"github.com/caarlos0/env"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand All @@ -23,6 +25,23 @@ func main() {
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, os.Interrupt)
defer stop()

srv := &http.Server{Addr: ":8080"}
go func() {
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/health", healthHandler)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("ERROR: HTTP server error: %v", err)
}
}()

go func() {
<-ctx.Done()
if err := srv.Shutdown(context.Background()); err != nil {
log.Printf("ERROR: HTTP server shutdown: %v", err)
}
}()

switch command {
case "collect":
runCollector(ctx)
Expand All @@ -34,6 +53,13 @@ func main() {
}
}

func healthHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("ok")); err != nil {
log.Println("ERROR: writing health response:", err)
}
}

func runCollector(ctx context.Context) {
config, err := getCollectorConfig()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/pxc/app/deployment/binlog-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ func GetBinlogCollectorDeployment(cr *api.PerconaXtraDBCluster, initImage string
},
},
}

if cr.CompareVersionWith("1.17.0") >= 0 {
container.Ports = []corev1.ContainerPort{
{
ContainerPort: 8080,
Name: "metrics",
},
}
}

replicas := int32(1)

var initContainers []corev1.Container
Expand Down

0 comments on commit ce3a25a

Please sign in to comment.