Skip to content

Commit

Permalink
PMM-12712 Shards collector. (#762)
Browse files Browse the repository at this point in the history
* PMM-12712 Shard ID and count.

* PMM-12712 Name and count.

* PMM-12712 Sharded collector.

* PMM-12712 Remove missed print.

* PMM-12712 Revert test changes.

* PMM-12712 Revert old not related changes.

* PMM-12712 Another changes.

* PMM-12712 Missed print.

* PMM-12712 Align naming with task description.

* PMM-12712 Add mongos test client.

* PMM-12712 Lint.

* PMM-12712 Lint.

* PMM-12712 Typo.

* PMM-12712 Lint.

* PMM-12712 Test higher sleep.

* Revert "PMM-12712 Test higher sleep."

This reverts commit 88451ba.

* PMM-12712 Test of pipeline.

* PMM-12712 Change.

* PMM-12712 Test.

* PMM-12712 Test.

* PMM-12712 Test.

* PMM-12712 Correct aggregation to get chunks info.

* PMM-12712 Another progress.

* PMM-12702 Small refactor.

* PMM-12712 Test.

* PMM-12712 Fix, tests.

* PMM-12712 Temp.

* PMM-12712 Improve sharded test.

* PMM-12712 Change in workflow.

* PMM-12712 Remove print.

* PMM-12712 Init script changes.

* PMM-12712 Remove print.

* PMM-12712 Bigger sleep.

* PMM-12712 Refactor.

* PMM-12712 Small refactor.

* PMM-12712 Another refactor.

* PMM-12712 Fix after refactor.

* PMM-12712 Alias.

* PMM-12712 Alias for shell in older versions.

* PMM-12712 Static test shard.

* PMM-12712 Fix another tests, mongo 4 shell script.

* PMM-12712 Big sleep test.

* PMM-12712 Michael's fix.

* PMM-12712 Sleep.

* PMM-12712 Env.

* PMM-12712 Test.

* PMM-12712 Fix test.

* PMM-12712 Bigger sleep after changes.

* PMM-12712 Remove duplicate lines.

* PMM-12712 Renaming from sharded to shards.

* PMM-12712 Better script to detect proper mongo client.

* PMM-12712 Init script refactor.

* PMM-12712 Fix.

* PMM-12712 Small refactor.

* Update main.go

Co-authored-by: Michael Okoko <[email protected]>

* Update exporter/shards_collector.go

Co-authored-by: Michael Okoko <[email protected]>

* PMM-12712 Remove mongo 6 for now, skip test for shards.

* PMM-12712 Revert sleep length.

* PMM-12712 Mongo client based on vendor.

---------

Co-authored-by: Michael Okoko <[email protected]>
  • Loading branch information
JiriCtvrtka and idoqo authored Jan 15, 2024
1 parent 121a182 commit 0764ec6
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 5 deletions.
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ services:
- MONGO3=mongo-1-3
- ARBITER=mongo-1-arbiter
- RS=rs1
- VERSION=${TEST_MONGODB_IMAGE}
entrypoint: [ "/scripts/setup.sh" ]
networks:
- rs1
Expand Down Expand Up @@ -112,6 +113,7 @@ services:
- MONGO3=mongo-2-3
- ARBITER=mongo-2-arbiter
- RS=rs2
- VERSION=${TEST_MONGODB_IMAGE}
entrypoint: [ "/scripts/setup.sh" ]
networks:
- rs2
Expand Down Expand Up @@ -162,6 +164,7 @@ services:
- MONGO3=mongo-cnf-3
- RS=cnf-serv
- PORT=27017
- VERSION=${TEST_MONGODB_IMAGE}
entrypoint: [ "/scripts/setup.sh","cnf_servers" ]
networks:
- cnf-serv
Expand Down Expand Up @@ -205,6 +208,7 @@ services:
- PORT1=27017
- PORT2=27017
- PORT3=27017
- VERSION=${TEST_MONGODB_IMAGE}
entrypoint: [ "/scripts/init-shard.sh" ]
restart: on-failure:20

Expand Down
20 changes: 18 additions & 2 deletions docker/scripts/init-shard.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
#!/bin/bash
# `mongosh` is used starting from MongoDB 5.x
MONGODB_CLIENT="mongosh --quiet"
PARSED=(${VERSION//:/ })
MONGODB_VERSION=${PARSED[1]}
MONGODB_VENDOR=${PARSED[0]}
if [ "`echo ${MONGODB_VERSION} | cut -c 1`" = "4" ]; then
MONGODB_CLIENT="mongo"
fi
if [ "`echo ${MONGODB_VERSION} | cut -c 1`" = "5" ] && [ ${MONGODB_VENDOR} == "percona/percona-server-mongodb" ]; then
MONGODB_CLIENT="mongo"
fi
echo "MongoDB vendor, client and version: ${MONGODB_VENDOR} ${MONGODB_CLIENT} ${MONGODB_VERSION}"

mongodb1=`getent hosts ${MONGOS} | awk '{ print $1 }'`

Expand All @@ -17,16 +29,20 @@ mongodb33=`getent hosts ${MONGO33} | awk '{ print $1 }'`
port=${PORT:-27017}

echo "Waiting for startup.."
until mongo --host ${mongodb1}:${port} --eval 'quit(db.runCommand({ ping: 1 }).ok ? 0 : 2)' &>/dev/null; do
until ${MONGODB_CLIENT} --host ${mongodb1}:${port} --eval 'quit(db.runCommand({ ping: 1 }).ok ? 0 : 2)' &>/dev/null; do
printf '.'
sleep 1
done

echo "Started.."

echo init-shard.sh time now: `date +"%T" `
mongo --host ${mongodb1}:${port} <<EOF
${MONGODB_CLIENT} --host ${mongodb1}:${port} <<EOF
sh.addShard( "${RS1}/${mongodb11}:${PORT1},${mongodb12}:${PORT2},${mongodb13}:${PORT3}" );
sh.addShard( "${RS2}/${mongodb21}:${PORT1},${mongodb22}:${PORT2},${mongodb23}:${PORT3}" );
use test;
db.createCollection("shard");
sh.enableSharding("test");
sh.shardCollection( "test.shard", { id: "hashed" }, false, { numInitialChunks: 500, collation: { locale: "simple" }} );
sh.status();
EOF
18 changes: 15 additions & 3 deletions docker/scripts/setup.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
#!/bin/bash
# `mongosh` is used starting from MongoDB 5.x
MONGODB_CLIENT="mongosh --quiet"
PARSED=(${VERSION//:/ })
MONGODB_VERSION=${PARSED[1]}
MONGODB_VENDOR=${PARSED[0]}
if [ "`echo ${MONGODB_VERSION} | cut -c 1`" = "4" ]; then
MONGODB_CLIENT="mongo"
fi
if [ "`echo ${MONGODB_VERSION} | cut -c 1`" = "5" ] && [ ${MONGODB_VENDOR} == "percona/percona-server-mongodb" ]; then
MONGODB_CLIENT="mongo"
fi
echo "MongoDB vendor, client and version: ${MONGODB_VENDOR} ${MONGODB_CLIENT} ${MONGODB_VERSION}"

mongodb1=`getent hosts ${MONGO1} | awk '{ print $1 }'`
mongodb2=`getent hosts ${MONGO2} | awk '{ print $1 }'`
Expand All @@ -8,7 +20,7 @@ arbiter=`getent hosts ${ARBITER} | awk '{ print $1 }'`
port=${PORT:-27017}

echo "Waiting for startup.."
until mongo --host ${mongodb1}:${port} --eval 'quit(db.runCommand({ ping: 1 }).ok ? 0 : 2)' &>/dev/null; do
until ${MONGODB_CLIENT} --host ${mongodb1}:${port} --eval 'quit(db.runCommand({ ping: 1 }).ok ? 0 : 2)' &>/dev/null; do
printf '.'
sleep 1
done
Expand All @@ -20,7 +32,7 @@ echo setup.sh time now: `date +"%T" `

function cnf_servers() {
echo "setup cnf servers on ${MONGO1}(${mongodb1}:${port})"
mongo --host ${mongodb1}:${port} <<EOF
${MONGODB_CLIENT} --host ${mongodb1}:${port} <<EOF
var cfg = {
"_id": "${RS}",
"version": 1,
Expand Down Expand Up @@ -48,7 +60,7 @@ EOF

function general_servers() {
echo "setup servers on ${MONGO1}(${mongodb1}:${port})"
mongo --host ${mongodb1}:${port} <<EOF
${MONGODB_CLIENT} --host ${mongodb1}:${port} <<EOF
var cfg = {
"_id": "${RS}",
"protocolVersion": 1,
Expand Down
10 changes: 10 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Opts struct {
EnableIndexStats bool
EnableCollStats bool
EnableProfile bool
EnableShards bool

EnableOverrideDescendingIndex bool

Expand Down Expand Up @@ -163,6 +164,7 @@ func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topol
e.opts.EnableIndexStats = true
e.opts.EnableCurrentopMetrics = true
e.opts.EnableProfile = true
e.opts.EnableShards = true
}

// arbiter only have isMaster privileges
Expand All @@ -175,6 +177,7 @@ func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topol
e.opts.EnableIndexStats = false
e.opts.EnableCurrentopMetrics = false
e.opts.EnableProfile = false
e.opts.EnableShards = false
}

// If we manually set the collection names we want or auto discovery is set.
Expand Down Expand Up @@ -230,6 +233,11 @@ func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topol
registry.MustRegister(rsgsc)
}

if e.opts.EnableShards && requestOpts.EnableShards {
sc := newShardsCollector(ctx, client, e.opts.Logger, e.opts.CompatibleMode)
registry.MustRegister(sc)
}

return registry
}

Expand Down Expand Up @@ -304,6 +312,8 @@ func (e *Exporter) Handler() http.Handler {
requestOpts.EnableCollStats = true
case "profile":
requestOpts.EnableProfile = true
case "shards":
requestOpts.EnableShards = true
}
}

Expand Down
189 changes: 189 additions & 0 deletions exporter/shards_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// mongodb_exporter
// Copyright (C) 2017 Percona LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter

import (
"context"
"fmt"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)

type shardsCollector struct {
ctx context.Context
base *baseCollector
compatible bool
}

// newShardsCollector creates collector collecting metrics about chunks for shards Mongo.
func newShardsCollector(ctx context.Context, client *mongo.Client, logger *logrus.Logger, compatibleMode bool) *shardsCollector {
return &shardsCollector{
ctx: ctx,
base: newBaseCollector(client, logger),
compatible: compatibleMode,
}
}

func (d *shardsCollector) Describe(ch chan<- *prometheus.Desc) {
d.base.Describe(d.ctx, ch, d.collect)
}

func (d *shardsCollector) Collect(ch chan<- prometheus.Metric) {
d.base.Collect(ch)
}

func (d *shardsCollector) collect(ch chan<- prometheus.Metric) {
defer measureCollectTime(ch, "mongodb", "shards")()

client := d.base.client
logger := d.base.logger
prefix := "shards collection chunks"

databaseNames, err := client.ListDatabaseNames(d.ctx, bson.D{})
if err != nil {
logger.Errorf("cannot get database names: %s", err)
}
for _, database := range databaseNames {
collections := d.getCollectionsForDBName(database)
for _, row := range collections {
if len(row) == 0 {
continue
}

var ok bool
if _, ok = row["_id"]; !ok {
continue
}
var rowID string
if rowID, ok = row["_id"].(string); !ok {
continue
}

chunks := d.getChunksForCollection(row)
for _, c := range chunks {
labels, chunks, success := d.getInfoForChunk(c, database, rowID)
if !success {
continue
}
for _, metric := range makeMetrics(prefix, primitive.M{"count": chunks}, labels, d.compatible) {
ch <- metric
}
}
}
}
}

func (d *shardsCollector) getInfoForChunk(c primitive.M, database, rowID string) (map[string]string, int32, bool) {
var ok bool
if _, ok = c["dropped"]; ok {
if dropped, ok := c["dropped"].(bool); ok && dropped {
return nil, 0, false
}
}

if _, ok = c["shard"]; !ok {
return nil, 0, ok
}
var shard string
if shard, ok = c["shard"].(string); !ok {
return nil, 0, ok
}

if _, ok = c["nChunks"]; !ok {
return nil, 0, ok
}
var chunks int32
if chunks, ok = c["nChunks"].(int32); !ok {
return nil, 0, ok
}

labels := make(map[string]string)
labels["database"] = database
labels["collection"] = strings.Replace(rowID, fmt.Sprintf("%s.", database), "", 1)
labels["shard"] = shard

logger := d.base.logger
logger.Debug("$shards metrics for config.chunks")
debugResult(logger, primitive.M{database: c})

return labels, chunks, true
}

func (d *shardsCollector) getCollectionsForDBName(database string) []primitive.M {
client := d.base.client
logger := d.base.logger

cursor := client.Database("config").Collection("collections")
rs, err := cursor.Find(d.ctx, bson.M{"_id": bson.M{"$regex": fmt.Sprintf("^%s.", database), "$options": "i"}})
if err != nil {
logger.Errorf("cannot find _id starting with \"%s.\":%s", database, err)
return nil
}

var decoded []bson.M
err = rs.All(d.ctx, &decoded)
if err != nil {
logger.Errorf("cannot decode collections: %s", err)
return nil
}

return decoded
}

func (d *shardsCollector) getChunksForCollection(row primitive.M) []bson.M {
var chunksMatchPredicate bson.M
if _, ok := row["timestamp"]; ok {
if uuid, ok := row["uuid"]; ok {
chunksMatchPredicate = bson.M{"uuid": uuid}
}
} else {
if id, ok := row["_id"]; ok {
chunksMatchPredicate = bson.M{"_id": id}
}
}

aggregation := bson.A{
bson.M{"$match": chunksMatchPredicate},
bson.M{"$group": bson.M{"_id": "$shard", "cnt": bson.M{"$sum": 1}}},
bson.M{"$project": bson.M{"_id": 0, "shard": "$_id", "nChunks": "$cnt"}},
bson.M{"$sort": bson.M{"shard": 1}},
}

client := d.base.client
logger := d.base.logger

cur, err := client.Database("config").Collection("chunks").Aggregate(context.Background(), aggregation)
if err != nil {
logger.Errorf("cannot get $shards cursor for collection config.chunks: %s", err)
return nil
}

var chunks []bson.M
err = cur.All(context.Background(), &chunks)
if err != nil {
logger.Errorf("cannot decode $shards for collection config.chunks: %s", err)
return nil
}

return chunks
}

var _ prometheus.Collector = (*shardsCollector)(nil)
Loading

0 comments on commit 0764ec6

Please sign in to comment.