Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions db/comdb2.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ void berk_memp_sync_alarm_ms(int);
#include <net_appsock.h>
#include "sc_csc2.h"
#include "sc_util.h"
#include "sc_global.h"
#include "reverse_conn.h"
#include "alias.h"
#include "str_util.h" /* QUOTE */
Expand Down Expand Up @@ -4647,8 +4648,6 @@ int throttle_lim = 10000;
int cpu_throttle_threshold = 100000;

double gbl_cpupercent;
#include <sc_util.h>


static inline void log_tbl_item(int64_t curr, int64_t *prev, const char *(*type_to_str)(int), int type, char *string,
int *hdr_p, struct reqlogger *statlogger, dbtable *tbl, int first)
Expand Down Expand Up @@ -4750,6 +4749,8 @@ void *statthd(void *p)
int have_scon_stats = 0;
int64_t rw_evicts;

int last_num_sc = 0;
struct running_sc_info *last_schema_changes = NULL;

thrman_register(THRTYPE_GENERIC);
thread_started("statthd");
Expand Down Expand Up @@ -4923,7 +4924,9 @@ void *statthd(void *p)
}

int aa_include_updates = bdb_attr_get(thedb->bdb_attr, BDB_ATTR_AA_COUNT_UPD);

rdlock_schema_lk();

for (ii = 0; ii < dbenv->num_dbs; ++ii) {
dbtable *tbl = dbenv->dbs[ii];
int hdr = 0;
Expand Down Expand Up @@ -4970,6 +4973,50 @@ void *statthd(void *p)
log_tbl_item(tbl->deadlock_count, &tbl->saved_deadlock_count, NULL, 0, "deadlock count", &hdr,
statlogger, tbl, 0);
}

int num_sc = 0;
struct running_sc_info *schema_changes = NULL;

if (list_running_schema_changes(&schema_changes, &num_sc) == 0) {
if (num_sc > 0) {
/* log header */
reqlog_logf(statlogger, REQL_INFO, "SCHEMA CHANGE STATS\n");
}

for (ii = 0; ii < num_sc; ii++) {
struct running_sc_info sc = schema_changes[ii];
struct running_sc_info *last_sc = NULL;

for (jj = 0; jj < last_num_sc; jj++) {
if (strcmp(last_schema_changes[jj].table_name, sc.table_name) == 0) {
last_sc = &last_schema_changes[jj];
break;
}
}

uint64_t last_nrecs = last_sc ? last_sc->nrecs : 0;
uint32_t last_adds = last_sc ? last_sc->adds : 0;
uint32_t last_updates = last_sc ? last_sc->updates : 0;
uint32_t last_deletes = last_sc ? last_sc->deletes : 0;

uint64_t diff_nrecs = sc.nrecs - last_nrecs;

reqlog_logf(
statlogger, REQL_INFO,
" table '%s' records converted %ld diff %ld rate %ld r/s adds %d updates %d deletes %d\n",
sc.table_name, //
sc.nrecs, diff_nrecs, diff_nrecs / thresh, //
sc.adds - last_adds, //
sc.updates - last_updates, //
sc.deletes - last_deletes);
}

free(last_schema_changes);

last_num_sc = num_sc;
last_schema_changes = schema_changes;
}

unlock_schema_lk();

pstats = bdb_get_process_stats();
Expand Down
49 changes: 49 additions & 0 deletions schemachange/sc_global.c
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,52 @@ void sc_alter_latency(int counter)
}
}
}

int list_running_schema_changes(struct running_sc_info **info, int *num_running_sc)
{
unsigned int bucket;
void *entry;
sc_table_t *sc_table = NULL;
*num_running_sc = 0;

Pthread_mutex_lock(&schema_change_in_progress_mutex);

if (!sc_tables) {
Pthread_mutex_unlock(&schema_change_in_progress_mutex);
return 1;
}

/* preallocate outparam */
int unused;
int num_entries = 0;
hash_info(sc_tables, &unused, &unused, &unused, &unused, &num_entries, &unused, &unused);
*info = malloc(sizeof(struct running_sc_info) * num_entries);

for (sc_table = hash_first(sc_tables, &entry, &bucket); sc_table;
sc_table = hash_next(sc_tables, &entry, &bucket)) {

/* get_dbtable_by_name looks at thedb->db_hash, so caller must
* hold the schema lock */
struct dbtable *db = get_dbtable_by_name(sc_table->tablename);

if (!db)
continue;

Pthread_rwlock_rdlock(&db->sc_live_lk);

if (db->doing_conversion || db->doing_upgrade) {
struct running_sc_info *sc_info = &((*info)[*num_running_sc]);
sc_info->table_name = strdup(sc_table->tablename);
sc_info->nrecs = db->sc_nrecs;
sc_info->adds = db->sc_adds;
sc_info->updates = db->sc_updates;
sc_info->deletes = db->sc_deletes;
++(*num_running_sc);
}

Pthread_rwlock_unlock(&db->sc_live_lk);
}

Pthread_mutex_unlock(&schema_change_in_progress_mutex);
return 0;
}
13 changes: 13 additions & 0 deletions schemachange/sc_global.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#ifndef INCLUDE_SC_GLOBAL_H
#define INCLUDE_SC_GLOBAL_H

#include <stdint.h>

extern pthread_mutex_t schema_change_in_progress_mutex;
extern pthread_mutex_t fastinit_in_progress_mutex;
extern pthread_mutex_t schema_change_sbuf2_lock;
Expand Down Expand Up @@ -60,6 +62,14 @@ extern int rep_sync_save;
extern int log_sync_save;
extern int log_sync_time_save;

struct running_sc_info {
const char *table_name;
uint64_t nrecs; /* num records converted (does not include `adds`) */
uint32_t adds; /* num added in front of sc cursor */
uint32_t updates; /* num updated behind sc cursor */
uint32_t deletes; /* num deleted behind sc cursor */
};

int is_dta_being_rebuilt(struct scplan *plan);
const char *get_sc_to_name(const char *);
void wait_for_sc_to_stop(const char *operation, const char *func, int line);
Expand Down Expand Up @@ -87,5 +97,8 @@ struct schema_change_type *preempt_ongoing_alter(char *table, int action);
void clear_ongoing_alter();
int get_stopsc(const char *func, int line);
void sc_alter_latency(int counter);
/* List all running schema changes and their progress. Caller must acquire schema
* lock in read mode. */
int list_running_schema_changes(struct running_sc_info **info, int *num_running_sc);

#endif
1 change: 0 additions & 1 deletion tests/quarantine.csv
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ queuedb_rollover_noroll1_generated,UNKNOWN,181062375
simple_ssl,UNKNOWN,181151988
sc_timepart,UNKNOWN,181186021
snapshot_during_truncate,UNKNOWN,181344241
reqlog_update_during_sc,UNKNOWN,181484732
scupdates_logicalsc_generated,UNKNOWN,181484749
sc_resume_logicalsc_generated,UNKNOWN,181484807
consumer_non_atomic_default_consumer_generated,DB_BUG,181573461
Expand Down
12 changes: 12 additions & 0 deletions tests/reqlog_update_during_sc.test/extract_sc_stats_timestamps.awk
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# If the line starts with a timestamp in the format "MM/DD HH:MM:SS",
match($0, /^([0-9]{2}\/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2})/, a) {
# Parse it into Unix time (e.g. `date -d "YYYY/MM/DD HH:MM:SS" +%s`).
cmd = "date -d \"" strftime("%Y") "/" a[1] "\" +%s"
cmd | getline timestamp
close(cmd)

# If the line contains "SCHEMA CHANGE STATS", emit the timestamp.
if ($0 ~ /SCHEMA CHANGE STATS/) {
print timestamp
}
}
67 changes: 55 additions & 12 deletions tests/reqlog_update_during_sc.test/runit
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function populate_table()
# Adjust the number of rows inserted based on whether we are running against
# cluster vs. standalone. The latter is able to rebuild the table faster and
# thus may not run long enough to exercise the logic of this test.
local -r num_iter=$(if [[ -n "${CLUSTER}" ]]; then echo 200; else echo 500; fi)
local -r num_iter=$(if [[ -n "${CLUSTER}" ]]; then echo 300; else echo 1000; fi)

for i in `seq 1 ${num_iter}`; do
"${CDB2SQL_EXE}" ${CDB2_OPTIONS} "${DBNAME}" default "insert into t select * from generate_series(1, 1000)"
Expand Down Expand Up @@ -56,18 +56,47 @@ function extract_log_timestamps()
awk -f "${TESTDIR}/${TESTCASE}.test/extract_timestamps.awk" "${logfile}" | sort -nu
}

function extract_sc_stats_timestamps()
{
local -r logfile="${1}"

awk -f "${TESTDIR}/${TESTCASE}.test/extract_sc_stats_timestamps.awk" "${logfile}" | sort -nu
}

function verify_log_continuity()
{
local -r timestamps_file="${1}"
local -r start_time="${2}"
local -r end_time="${3}"
local -r node="${4}" # only for logging purposes

local missing_timestamps=()

for (( timestamp = start_time; timestamp <= end_time; timestamp++ )); do
if ! grep -q "^${timestamp}$" "${timestamps_file}"; then
failexit "Missing statreqs for timestamp on node '${node}': ${timestamp} ($( date -d @${timestamp} '+%m/%d %H:%M:%S' ))"
missing_timestamps+=("${timestamp}")
fi
done

# If we are missing statreqs for more than 30% of the time (chosen
# arbitrarily) during which schema change was running, something might
# be broken.
local -r num_missing=${#missing_timestamps[@]}
local -r total=$( wc -l < "${timestamps_file}" )

if (( num_missing > total * 3 / 10 )); then
echo "Available timestamps on node '${node}' in ${timestamps_file}:"
echo
cat "${timestamps_file}"
echo
echo "Missing statreqs for the following timestamps on node '${node}':"
echo
for timestamp in "${missing_timestamps[@]}"; do
echo "${timestamp}"
done
echo
failexit "Missing statreqs for more than 30% of the time during schema change on '${node}'"
fi
}

function verify_logs_for_node()
Expand All @@ -76,6 +105,7 @@ function verify_logs_for_node()
local -r start_time="${2}"
local -r end_time="${3}"
local -r node="${4}"
local -r is_master="${5}"

# Filter logs to only include entries between when schema change started and ended.
filter_logs "${logfile}" "${start_time}" "${end_time}" > "${logfile}.filtered"
Expand All @@ -92,19 +122,27 @@ function verify_logs_for_node()
echo
fi

# Get the timestamps for all the seconds during which we got statreqs.
extract_log_timestamps "${logfile}.filtered" > "${logfile}.filtered.timestamps"

# Check that for every second between when schema change started and ended,
# we got statreqs. Use `start_time + 1` and `end_time - 1` as the logs from
# the first and last seconds may not have overlapped with schema change.
verify_log_continuity "${logfile}.filtered.timestamps" $(( start_time + 1 )) $(( end_time - 1 )) "${node}"
# we got _something_ in statreqs. Ignore 3 seconds before and after to give
# some leeway.
extract_log_timestamps "${logfile}.filtered" > "${logfile}.filtered.timestamps"
verify_log_continuity "${logfile}.filtered.timestamps" $(( start_time + 3 )) $(( end_time - 3 )) "${node}"

if [[ "${is_master}" == "1" ]]; then
# Check that for every second between when schema change started and ended,
# we got 'SCHEMA CHANGE STATS' in statreqs. Ignore 3 seconds before and after to give
# some leeway.
echo "${node} is master, checking that schema change stats are present in statreqs"
extract_sc_stats_timestamps "${logfile}.filtered" > "${logfile}.filtered.sc_timestamps"
verify_log_continuity "${logfile}.filtered.sc_timestamps" $(( start_time + 3 )) $(( end_time - 3 )) "${node}"
fi
}

function verify_logs()
{
local -r start_time="${1}"
local -r end_time="${2}"
local -r master="${3}"

src="${TESTDIR}/var/log/cdb2/${DBNAME}.statreqs"

Expand All @@ -113,13 +151,17 @@ function verify_logs()
# Grab logs from node.
local dest="${TESTDIR}/logs/${DBNAME}_${node}.statreqs"
ssh -o StrictHostKeyChecking=no "${node}" "cat ${src}" > "${dest}"
verify_logs_for_node "${dest}" "${start_time}" "${end_time}" "${node}"

local is_master=$(if [[ "${node}" == "${master}" ]]; then echo 1; else echo 0; fi)
verify_logs_for_node "${dest}" "${start_time}" "${end_time}" "${node}" "${is_master}"
done
else
# Single node case.
local dest="${TESTDIR}/logs/${DBNAME}_standalone.statreqs"
cp "${src}" "${dest}"
verify_logs_for_node "${dest}" "${start_time}" "${end_time}" "standalone"

local -r is_master=1 # treat as master because we should have 'SCHEMA CHANGE STATS' in statreqs
verify_logs_for_node "${dest}" "${start_time}" "${end_time}" "standalone" "${is_master}"
fi
}

Expand All @@ -133,7 +175,8 @@ function main()
spam_queries &
spam_pid=$!

echo "Master is $(get_master)"
local -r master=$(get_master)
echo "Master is ${master}"
local -r rebuild_start_time=$(date +%s)
echo "Starting schema change at $(date -d @${rebuild_start_time} '+%m/%d %H:%M:%S')"
run_schema_change
Expand All @@ -148,7 +191,7 @@ function main()

# Check that we got statreqs on all nodes from the spam queries despite
# schema change running at the same time.
verify_logs "${rebuild_start_time}" "${rebuild_end_time}"
verify_logs "${rebuild_start_time}" "${rebuild_end_time}" "${master}"
}

main
Expand Down