Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ REGISTER_TUNABLE("match_on_ckp", "Allow rep_verify_match on ckp records. (Defau
&gbl_match_on_ckp, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);

/* physical replication */
REGISTER_TUNABLE("blocking_physrep", "Physical replicant blocks on select. (Default: off)", TUNABLE_BOOLEAN,
REGISTER_TUNABLE("blocking_physrep", "Physical replicant blocks on select. (Default: on)", TUNABLE_BOOLEAN,
&gbl_blocking_physrep, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("tranlog_default_timeout", "Default timeout for tranlog queries. (Default: 30)", TUNABLE_INTEGER,
&gbl_tranlog_default_timeout, 0, NULL, NULL, NULL, NULL);
Expand Down Expand Up @@ -1885,7 +1885,7 @@ REGISTER_TUNABLE("physrep_ignore_queues", "Don't replicate queues.", TUNABLE_BOO
READONLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_max_rollback", "Maximum logs physrep can rollback. (Default: 0)", TUNABLE_INTEGER,
&gbl_physrep_max_rollback, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_pollms", "Physical replicant poll interval in milliseconds. (Default: 200)", TUNABLE_INTEGER,
REGISTER_TUNABLE("physrep_pollms", "Physical replicant poll interval in milliseconds. (Default: 0)", TUNABLE_INTEGER,
&gbl_physrep_pollms, 0, NULL, NULL, NULL, NULL);

/* reversql-sql */
Expand Down
32 changes: 22 additions & 10 deletions db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ typedef struct DB_Connection {
int gbl_physrep_debug = 0;
int gbl_physrep_reconnect_interval = 3600; // force re-registration every hour
int gbl_physrep_reconnect_penalty = 0;
int gbl_blocking_physrep = 0;
int gbl_blocking_physrep = 1;
int gbl_physrep_fanout = 8;
int gbl_physrep_max_candidates = 6;
int gbl_physrep_max_pending_replicants = 10;
Expand Down Expand Up @@ -673,6 +673,8 @@ int is_valid_lsn(unsigned int file, unsigned int offset)
offset == get_next_offset(thedb->bdb_env->dbenv, info);
}

extern __thread int physrep_out_of_order;

static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
{
/* vars for 1 record */
Expand All @@ -693,6 +695,13 @@ static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
physrep_logmsg(LOGMSG_ERROR, "%s:%d: Could not parse lsn %s\n",
__func__, __LINE__, lsn);
}
if (file == -1 && offset == -1) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d requested invalid record, force reconnect\n", __func__, __LINE__);
}
physrep_out_of_order = 1;
return prev_info;
}
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d: Processing record (lsn %d:%d)\n",
__func__, __LINE__, file, offset);
Expand Down Expand Up @@ -1362,8 +1371,7 @@ static int do_wait_for_reverse_conn(cdb2_hndl_tp *repl_metadb) {
This is the database/node that to replicant connects to retrieve and
apply physical logs.
*/
int gbl_physrep_pollms = 200;
extern __thread int physrep_out_of_order;
int gbl_physrep_pollms = 0;
static void *physrep_worker(void *args)
{
comdb2_name_thread(__func__);
Expand All @@ -1377,6 +1385,7 @@ static void *physrep_worker(void *args)
int is_revconn = -1;
int last_revconn_check = 0;
int last_update_registry = 0;
int pollms;
LOG_INFO info;
LOG_INFO prev_info;
DB_Connection *repl_db_cnct = NULL;
Expand Down Expand Up @@ -1404,8 +1413,6 @@ static void *physrep_worker(void *args)
if (repl_db_connected) {
close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__);
}
if (gbl_physrep_debug)
physrep_logmsg(LOGMSG_USER, "I am not the LEADER node, skipping async-replication\n");
goto sleep_and_retry;
}

Expand Down Expand Up @@ -1566,10 +1573,8 @@ static void *physrep_worker(void *args)

prev_info = info;

rc = snprintf(sql_cmd, sql_cmd_len,
"select * from comdb2_transaction_logs('{%u:%u}'%s)",
info.file, info.offset,
(gbl_blocking_physrep ? ", NULL, 1" : ""));
rc = snprintf(sql_cmd, sql_cmd_len, "select * from comdb2_transaction_logs('{%u:%u}', NULL%s)", info.file,
info.offset, (gbl_blocking_physrep ? ",9" : ",8"));
if (rc < 0 || rc >= sql_cmd_len)
physrep_logmsg(LOGMSG_ERROR, "%s:%d Command buffer is not long enough!\n", __func__, __LINE__);
if (gbl_physrep_debug)
Expand Down Expand Up @@ -1648,7 +1653,14 @@ static void *physrep_worker(void *args)
do_truncate = 1;
}
sleep_and_retry:
poll(0, 0, gbl_physrep_pollms);
if ((thedb->master != gbl_myhostname) || !repl_db_connected) {
sleep(1);
} else {
pollms = gbl_physrep_pollms;
if (pollms > 0) {
poll(0, 0, pollms);
}
}
}

if (repl_db_connected == 1) {
Expand Down
26 changes: 21 additions & 5 deletions sqlite/ext/comdb2/tranlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct tranlog_cursor {
int startAppRecGen;
int starttime;
int timeout;
int invalidRecord;
DB_LOGC *logc; /* Log Cursor */
DBT data;
};
Expand Down Expand Up @@ -154,6 +155,11 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
uint32_t durable_gen=0, rc, getflags;
bdb_state_type *bdb_state = thedb->bdb_env;

if (pCur->invalidRecord) {
pCur->hitLast = 1;
return SQLITE_OK;
}

if (pCur->notDurable || pCur->hitLast)
return SQLITE_OK;

Expand Down Expand Up @@ -252,9 +258,13 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
}

if (getflags != DB_NEXT && getflags != DB_PREV) {
logmsg(LOGMSG_ERROR, "%s line %d did not expect logc->get to fail with flag: %d."
"got rc %d\n",
__func__, __LINE__, getflags, rc);
if (pCur->flags & TRANLOG_FLAGS_SENTINEL) {
/* Return sentinel record with LSN {-1:-1} */
pCur->curLsn.file = pCur->curLsn.offset = -1;
pCur->invalidRecord = 1;
pCur->iRowid++;
return SQLITE_OK;
}
return SQLITE_INTERNAL;
}

Expand Down Expand Up @@ -665,7 +675,9 @@ static int tranlogColumn(
}
break;
case TRANLOG_COLUMN_TXNID:
LOGCOPY_32(&txnid, &((char *) pCur->data.data)[4]);
if (pCur->data.data) {
LOGCOPY_32(&txnid, &((char *) pCur->data.data)[4]);
}
sqlite3_result_int64(ctx, txnid);
break;
case TRANLOG_COLUMN_UTXNID:
Expand Down Expand Up @@ -717,7 +729,11 @@ static int tranlogColumn(
}
break;
case TRANLOG_COLUMN_LOG:
sqlite3_result_blob(ctx, pCur->data.data, pCur->data.size, NULL);
if (pCur->data.data) {
sqlite3_result_blob(ctx, pCur->data.data, pCur->data.size, NULL);
} else {
sqlite3_result_null(ctx);
}
break;
case TRANLOG_COLUMN_CHILDUTXNID:
if (pCur->data.data)
Expand Down
1 change: 1 addition & 0 deletions sqlite/ext/comdb2/tranlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum {
TRANLOG_FLAGS_BLOCK = 0x1,
TRANLOG_FLAGS_DURABLE = 0x2,
TRANLOG_FLAGS_DESCENDING = 0x4,
TRANLOG_FLAGS_SENTINEL = 0x8,
};

u_int64_t get_timestamp_from_matchable_record(char *data);
Expand Down
15 changes: 15 additions & 0 deletions tests/phys_rep_tiered.test/runit
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,16 @@ function verify_alt_metadb_system_table
fi
}

# Ask for a non-existent record with and without sentinel flag
function non_existent_record
{
x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "select lsn from comdb2_transaction_logs('{1:1}', NULL, 8)" 2>/dev/null)
[[ "$x" != '{-1:-1}' ]] && cleanFailExit "Expected {-1:-1} for non-existent record with sentinel flag, got $x"

x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "select lsn from comdb2_transaction_logs('{1:1}', NULL, 0)" 2>/dev/null)
[[ -n "$x" ]] && cleanFailExit "Expected no output, got $x, for non-existent record without sentinel flag"
}

# Halt a clustered physical replicant .. swing the master on that cluster
# a number of times, then restart the physical replicant .. looking at the
# trace, verify that nothing ignores rectype 2, 11, or 10 ..
Expand Down Expand Up @@ -2215,6 +2225,11 @@ function run_tests
check_metadb_memstat
testcase_finish $testcase

testcase="non_existent_record"
testcase_preamble $testcase
non_existent_record
testcase_finish $testcase

testcase="verify_non_ignored_reptype"
testcase_preamble $testcase
verify_non_ignored_reptype
Expand Down
4 changes: 2 additions & 2 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
(name='blobmem_sz_thresh_kb', description='Sets the threshold (in KB) above which blobs are allocated by the blob allocator. (Default: 0)', type='INTEGER', value='-1', read_only='Y')
(name='blobstripe', description='', type='BOOLEAN', value='ON', read_only='Y')
(name='blocking_latches', description='Block on latch rather than deadlock', type='BOOLEAN', value='OFF', read_only='N')
(name='blocking_physrep', description='Physical replicant blocks on select. (Default: off)', type='BOOLEAN', value='OFF', read_only='N')
(name='blocking_physrep', description='Physical replicant blocks on select. (Default: on)', type='BOOLEAN', value='ON', read_only='N')
(name='broadcast_check_rmtpol', description='Check rmtpol before sending triggers', type='BOOLEAN', value='ON', read_only='N')
(name='broken_max_rec_sz', description='', type='INTEGER', value='0', read_only='Y')
(name='broken_num_parser', description='', type='BOOLEAN', value='OFF', read_only='Y')
Expand Down Expand Up @@ -742,7 +742,7 @@
(name='physrep_max_rollback', description='Maximum logs physrep can rollback. (Default: 0)', type='INTEGER', value='0', read_only='N')
(name='physrep_metadb_host', description='List of physical replication metadb cluster hosts.', type='STRING', value=NULL, read_only='Y')
(name='physrep_metadb_name', description='Physical replication metadb cluster name.', type='STRING', value=NULL, read_only='Y')
(name='physrep_pollms', description='Physical replicant poll interval in milliseconds. (Default: 200)', type='INTEGER', value='200', read_only='N')
(name='physrep_pollms', description='Physical replicant poll interval in milliseconds. (Default: 0)', type='INTEGER', value='0', read_only='N')
(name='physrep_reconnect_interval', description='Reconnect interval for physical replicants (Default: 600)', type='INTEGER', value='3600', read_only='N')
(name='physrep_reconnect_penalty', description='Physrep wait seconds before retry to the same node. (Default: 5)', type='INTEGER', value='0', read_only='N')
(name='physrep_repl_host', description='Current physrep host.', type='STRING', value=NULL, read_only='Y')
Expand Down
Loading