Skip to content

Commit 54c5951

Browse files
committed
Return sentinel for non-existant record
Signed-off-by: Mark Hannum <[email protected]>
1 parent fedaab4 commit 54c5951

File tree

6 files changed

+52
-14
lines changed

6 files changed

+52
-14
lines changed

db/db_tunables.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,7 +1808,7 @@ REGISTER_TUNABLE("match_on_ckp", "Allow rep_verify_match on ckp records. (Defau
18081808
&gbl_match_on_ckp, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
18091809

18101810
/* physical replication */
1811-
REGISTER_TUNABLE("blocking_physrep", "Physical replicant blocks on select. (Default: off)", TUNABLE_BOOLEAN,
1811+
REGISTER_TUNABLE("blocking_physrep", "Physical replicant blocks on select. (Default: on)", TUNABLE_BOOLEAN,
18121812
&gbl_blocking_physrep, 0, NULL, NULL, NULL, NULL);
18131813
REGISTER_TUNABLE("tranlog_default_timeout", "Default timeout for tranlog queries. (Default: 30)", TUNABLE_INTEGER,
18141814
&gbl_tranlog_default_timeout, 0, NULL, NULL, NULL, NULL);
@@ -1885,7 +1885,7 @@ REGISTER_TUNABLE("physrep_ignore_queues", "Don't replicate queues.", TUNABLE_BOO
18851885
READONLY, NULL, NULL, NULL, NULL);
18861886
REGISTER_TUNABLE("physrep_max_rollback", "Maximum logs physrep can rollback. (Default: 0)", TUNABLE_INTEGER,
18871887
&gbl_physrep_max_rollback, 0, NULL, NULL, NULL, NULL);
1888-
REGISTER_TUNABLE("physrep_pollms", "Physical replicant poll interval in milliseconds. (Default: 200)", TUNABLE_INTEGER,
1888+
REGISTER_TUNABLE("physrep_pollms", "Physical replicant poll interval in milliseconds. (Default: 0)", TUNABLE_INTEGER,
18891889
&gbl_physrep_pollms, 0, NULL, NULL, NULL, NULL);
18901890

18911891
/* reversql-sql */

db/phys_rep.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,8 @@ int is_valid_lsn(unsigned int file, unsigned int offset)
673673
offset == get_next_offset(thedb->bdb_env->dbenv, info);
674674
}
675675

676+
extern __thread int physrep_out_of_order;
677+
676678
static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
677679
{
678680
/* vars for 1 record */
@@ -693,6 +695,13 @@ static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
693695
physrep_logmsg(LOGMSG_ERROR, "%s:%d: Could not parse lsn %s\n",
694696
__func__, __LINE__, lsn);
695697
}
698+
if (file == -1 && offset == -1) {
699+
if (gbl_physrep_debug) {
700+
physrep_logmsg(LOGMSG_USER, "%s:%d requested invalid record, force reconnect\n", __func__, __LINE__);
701+
}
702+
physrep_out_of_order = 1;
703+
return prev_info;
704+
}
696705
if (gbl_physrep_debug) {
697706
physrep_logmsg(LOGMSG_USER, "%s:%d: Processing record (lsn %d:%d)\n",
698707
__func__, __LINE__, file, offset);
@@ -1363,7 +1372,6 @@ static int do_wait_for_reverse_conn(cdb2_hndl_tp *repl_metadb) {
13631372
apply physical logs.
13641373
*/
13651374
int gbl_physrep_pollms = 0;
1366-
extern __thread int physrep_out_of_order;
13671375
static void *physrep_worker(void *args)
13681376
{
13691377
comdb2_name_thread(__func__);
@@ -1565,10 +1573,8 @@ static void *physrep_worker(void *args)
15651573

15661574
prev_info = info;
15671575

1568-
rc = snprintf(sql_cmd, sql_cmd_len,
1569-
"select * from comdb2_transaction_logs('{%u:%u}'%s)",
1570-
info.file, info.offset,
1571-
(gbl_blocking_physrep ? ", NULL, 1" : ""));
1576+
rc = snprintf(sql_cmd, sql_cmd_len, "select * from comdb2_transaction_logs('{%u:%u}', NULL%s)", info.file,
1577+
info.offset, (gbl_blocking_physrep ? ",9" : ",8"));
15721578
if (rc < 0 || rc >= sql_cmd_len)
15731579
physrep_logmsg(LOGMSG_ERROR, "%s:%d Command buffer is not long enough!\n", __func__, __LINE__);
15741580
if (gbl_physrep_debug)

sqlite/ext/comdb2/tranlog.c

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct tranlog_cursor {
6969
int startAppRecGen;
7070
int starttime;
7171
int timeout;
72+
int invalidRecord;
7273
DB_LOGC *logc; /* Log Cursor */
7374
DBT data;
7475
};
@@ -154,6 +155,11 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
154155
uint32_t durable_gen=0, rc, getflags;
155156
bdb_state_type *bdb_state = thedb->bdb_env;
156157

158+
if (pCur->invalidRecord) {
159+
pCur->hitLast = 1;
160+
return SQLITE_OK;
161+
}
162+
157163
if (pCur->notDurable || pCur->hitLast)
158164
return SQLITE_OK;
159165

@@ -252,9 +258,13 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
252258
}
253259

254260
if (getflags != DB_NEXT && getflags != DB_PREV) {
255-
logmsg(LOGMSG_ERROR, "%s line %d did not expect logc->get to fail with flag: %d."
256-
"got rc %d\n",
257-
__func__, __LINE__, getflags, rc);
261+
if (pCur->flags & TRANLOG_FLAGS_SENTINEL) {
262+
/* Return sentinel record with LSN {-1:-1} */
263+
pCur->curLsn.file = pCur->curLsn.offset = -1;
264+
pCur->invalidRecord = 1;
265+
pCur->iRowid++;
266+
return SQLITE_OK;
267+
}
258268
return SQLITE_INTERNAL;
259269
}
260270

@@ -665,7 +675,9 @@ static int tranlogColumn(
665675
}
666676
break;
667677
case TRANLOG_COLUMN_TXNID:
668-
LOGCOPY_32(&txnid, &((char *) pCur->data.data)[4]);
678+
if (pCur->data.data) {
679+
LOGCOPY_32(&txnid, &((char *) pCur->data.data)[4]);
680+
}
669681
sqlite3_result_int64(ctx, txnid);
670682
break;
671683
case TRANLOG_COLUMN_UTXNID:
@@ -717,7 +729,11 @@ static int tranlogColumn(
717729
}
718730
break;
719731
case TRANLOG_COLUMN_LOG:
720-
sqlite3_result_blob(ctx, pCur->data.data, pCur->data.size, NULL);
732+
if (pCur->data.data) {
733+
sqlite3_result_blob(ctx, pCur->data.data, pCur->data.size, NULL);
734+
} else {
735+
sqlite3_result_null(ctx);
736+
}
721737
break;
722738
case TRANLOG_COLUMN_CHILDUTXNID:
723739
if (pCur->data.data)

sqlite/ext/comdb2/tranlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ enum {
2424
TRANLOG_FLAGS_BLOCK = 0x1,
2525
TRANLOG_FLAGS_DURABLE = 0x2,
2626
TRANLOG_FLAGS_DESCENDING = 0x4,
27+
TRANLOG_FLAGS_SENTINEL = 0x8,
2728
};
2829

2930
u_int64_t get_timestamp_from_matchable_record(char *data);

tests/phys_rep_tiered.test/runit

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,6 +1996,16 @@ function verify_alt_metadb_system_table
19961996
fi
19971997
}
19981998

1999+
# Ask for a non-existent record with and without sentinel flag
2000+
function non_existent_record
2001+
{
2002+
x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "select lsn from comdb2_transaction_logs('{1:1}', NULL, 8)" 2>/dev/null)
2003+
[[ "$x" != '{-1:-1}' ]] && cleanFailExit "Expected {-1:-1} for non-existent record with sentinel flag, got $x"
2004+
2005+
x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "select lsn from comdb2_transaction_logs('{1:1}', NULL, 0)" 2>/dev/null)
2006+
[[ -n "$x" ]] && cleanFailExit "Expected no output, got $x, for non-existent record without sentinel flag"
2007+
}
2008+
19992009
# Halt a clustered physical replicant .. swing the master on that cluster
20002010
# a number of times, then restart the physical replicant .. looking at the
20012011
# trace, verify that nothing ignores rectype 2, 11, or 10 ..
@@ -2215,6 +2225,11 @@ function run_tests
22152225
check_metadb_memstat
22162226
testcase_finish $testcase
22172227

2228+
testcase="non_existent_record"
2229+
testcase_preamble $testcase
2230+
non_existent_record
2231+
testcase_finish $testcase
2232+
22182233
testcase="verify_non_ignored_reptype"
22192234
testcase_preamble $testcase
22202235
verify_non_ignored_reptype

tests/tunables.test/t00_all_tunables.expected

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
(name='blobmem_sz_thresh_kb', description='Sets the threshold (in KB) above which blobs are allocated by the blob allocator. (Default: 0)', type='INTEGER', value='-1', read_only='Y')
8383
(name='blobstripe', description='', type='BOOLEAN', value='ON', read_only='Y')
8484
(name='blocking_latches', description='Block on latch rather than deadlock', type='BOOLEAN', value='OFF', read_only='N')
85-
(name='blocking_physrep', description='Physical replicant blocks on select. (Default: off)', type='BOOLEAN', value='ON', read_only='N')
85+
(name='blocking_physrep', description='Physical replicant blocks on select. (Default: on)', type='BOOLEAN', value='ON', read_only='N')
8686
(name='broadcast_check_rmtpol', description='Check rmtpol before sending triggers', type='BOOLEAN', value='ON', read_only='N')
8787
(name='broken_max_rec_sz', description='', type='INTEGER', value='0', read_only='Y')
8888
(name='broken_num_parser', description='', type='BOOLEAN', value='OFF', read_only='Y')
@@ -742,7 +742,7 @@
742742
(name='physrep_max_rollback', description='Maximum logs physrep can rollback. (Default: 0)', type='INTEGER', value='0', read_only='N')
743743
(name='physrep_metadb_host', description='List of physical replication metadb cluster hosts.', type='STRING', value=NULL, read_only='Y')
744744
(name='physrep_metadb_name', description='Physical replication metadb cluster name.', type='STRING', value=NULL, read_only='Y')
745-
(name='physrep_pollms', description='Physical replicant poll interval in milliseconds. (Default: 200)', type='INTEGER', value='0', read_only='N')
745+
(name='physrep_pollms', description='Physical replicant poll interval in milliseconds. (Default: 0)', type='INTEGER', value='0', read_only='N')
746746
(name='physrep_reconnect_interval', description='Reconnect interval for physical replicants (Default: 600)', type='INTEGER', value='3600', read_only='N')
747747
(name='physrep_reconnect_penalty', description='Physrep wait seconds before retry to the same node. (Default: 5)', type='INTEGER', value='0', read_only='N')
748748
(name='physrep_repl_host', description='Current physrep host.', type='STRING', value=NULL, read_only='Y')

0 commit comments

Comments
 (0)