Skip to content

Commit 4e2590a

Browse files
committed
Return sentinel for non-existant record
Signed-off-by: Mark Hannum <[email protected]>
1 parent b7488da commit 4e2590a

File tree

4 files changed

+49
-8
lines changed

4 files changed

+49
-8
lines changed

db/phys_rep.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,8 @@ int is_valid_lsn(unsigned int file, unsigned int offset)
672672
offset == get_next_offset(thedb->bdb_env->dbenv, info);
673673
}
674674

675+
extern __thread int physrep_out_of_order;
676+
675677
static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
676678
{
677679
/* vars for 1 record */
@@ -692,6 +694,14 @@ static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info)
692694
physrep_logmsg(LOGMSG_ERROR, "%s:%d: Could not parse lsn %s\n",
693695
__func__, __LINE__, lsn);
694696
}
697+
if (file == -1 && offset == -1) {
698+
if (gbl_physrep_debug) {
699+
physrep_logmsg(LOGMSG_USER, "%s:%d requested invalid record, force reconnect\n",
700+
__func__, __LINE__);
701+
}
702+
physrep_out_of_order = 1;
703+
return prev_info;
704+
}
695705
if (gbl_physrep_debug) {
696706
physrep_logmsg(LOGMSG_USER, "%s:%d: Processing record (lsn %d:%d)\n",
697707
__func__, __LINE__, file, offset);
@@ -1362,7 +1372,6 @@ static int do_wait_for_reverse_conn(cdb2_hndl_tp *repl_metadb) {
13621372
apply physical logs.
13631373
*/
13641374
int gbl_physrep_pollms = 0;
1365-
extern __thread int physrep_out_of_order;
13661375
static void *physrep_worker(void *args)
13671376
{
13681377
comdb2_name_thread(__func__);
@@ -1567,9 +1576,9 @@ static void *physrep_worker(void *args)
15671576
prev_info = info;
15681577

15691578
rc = snprintf(sql_cmd, sql_cmd_len,
1570-
"select * from comdb2_transaction_logs('{%u:%u}'%s)",
1579+
"select * from comdb2_transaction_logs('{%u:%u}', NULL%s)",
15711580
info.file, info.offset,
1572-
(gbl_blocking_physrep ? ", NULL, 1" : ""));
1581+
(gbl_blocking_physrep ? ",9" : ",8"));
15731582
if (rc < 0 || rc >= sql_cmd_len)
15741583
physrep_logmsg(LOGMSG_ERROR, "%s:%d Command buffer is not long enough!\n", __func__, __LINE__);
15751584
if (gbl_physrep_debug)

sqlite/ext/comdb2/tranlog.c

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct tranlog_cursor {
6969
int startAppRecGen;
7070
int starttime;
7171
int timeout;
72+
int invalidRecord;
7273
DB_LOGC *logc; /* Log Cursor */
7374
DBT data;
7475
};
@@ -154,6 +155,11 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
154155
uint32_t durable_gen=0, rc, getflags;
155156
bdb_state_type *bdb_state = thedb->bdb_env;
156157

158+
if (pCur->invalidRecord) {
159+
pCur->hitLast = 1;
160+
return SQLITE_OK;
161+
}
162+
157163
if (pCur->notDurable || pCur->hitLast)
158164
return SQLITE_OK;
159165

@@ -252,9 +258,13 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
252258
}
253259

254260
if (getflags != DB_NEXT && getflags != DB_PREV) {
255-
logmsg(LOGMSG_ERROR, "%s line %d did not expect logc->get to fail with flag: %d."
256-
"got rc %d\n",
257-
__func__, __LINE__, getflags, rc);
261+
if (pCur->flags & TRANLOG_FLAGS_SENTINEL) {
262+
/* Return sentinel record with LSN {-1:-1} */
263+
pCur->curLsn.file = pCur->curLsn.offset = -1;
264+
pCur->invalidRecord = 1;
265+
pCur->iRowid++;
266+
return SQLITE_OK;
267+
}
258268
return SQLITE_INTERNAL;
259269
}
260270

@@ -665,7 +675,9 @@ static int tranlogColumn(
665675
}
666676
break;
667677
case TRANLOG_COLUMN_TXNID:
668-
LOGCOPY_32(&txnid, &((char *) pCur->data.data)[4]);
678+
if (pCur->data.data) {
679+
LOGCOPY_32(&txnid, &((char *) pCur->data.data)[4]);
680+
}
669681
sqlite3_result_int64(ctx, txnid);
670682
break;
671683
case TRANLOG_COLUMN_UTXNID:
@@ -717,7 +729,11 @@ static int tranlogColumn(
717729
}
718730
break;
719731
case TRANLOG_COLUMN_LOG:
720-
sqlite3_result_blob(ctx, pCur->data.data, pCur->data.size, NULL);
732+
if (pCur->data.data) {
733+
sqlite3_result_blob(ctx, pCur->data.data, pCur->data.size, NULL);
734+
} else {
735+
sqlite3_result_null(ctx);
736+
}
721737
break;
722738
case TRANLOG_COLUMN_CHILDUTXNID:
723739
if (pCur->data.data)

sqlite/ext/comdb2/tranlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ enum {
2424
TRANLOG_FLAGS_BLOCK = 0x1,
2525
TRANLOG_FLAGS_DURABLE = 0x2,
2626
TRANLOG_FLAGS_DESCENDING = 0x4,
27+
TRANLOG_FLAGS_SENTINEL = 0x8,
2728
};
2829

2930
u_int64_t get_timestamp_from_matchable_record(char *data);

tests/phys_rep_tiered.test/runit

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1978,6 +1978,16 @@ function check_metadb_memstat
19781978
fi
19791979
}
19801980

1981+
# Ask for a non-existent record with and without sentinel flag
1982+
function non_existent_record
1983+
{
1984+
x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "select lsn from comdb2_transaction_logs('{1:1}', NULL, 8)" 2>/dev/null)
1985+
[[ "$x" != '{-1:-1}' ]] && cleanFailExit "Expected {-1:-1} for non-existent record with sentinel flag, got $x"
1986+
1987+
x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "select lsn from comdb2_transaction_logs('{1:1}', NULL, 0)" 2>/dev/null)
1988+
[[ -n "$x" ]] && cleanFailExit "Expected no output, got $x, for non-existent record without sentinel flag"
1989+
}
1990+
19811991
# Halt a clustered physical replicant .. swing the master on that cluster
19821992
# a number of times, then restart the physical replicant .. looking at the
19831993
# trace, verify that nothing ignores rectype 2, 11, or 10 ..
@@ -2197,6 +2207,11 @@ function run_tests
21972207
check_metadb_memstat
21982208
testcase_finish $testcase
21992209

2210+
testcase="non_existent_record"
2211+
testcase_preamble $testcase
2212+
non_existent_record
2213+
testcase_finish $testcase
2214+
22002215
testcase="verify_non_ignored_reptype"
22012216
testcase_preamble $testcase
22022217
verify_non_ignored_reptype

0 commit comments

Comments
 (0)