Skip to content
50 changes: 39 additions & 11 deletions libraries/liblmdb/mdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ static NtCloseFunc *NtClose;
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <stdatomic.h>

#ifdef _MSC_VER
#include <io.h>
Expand Down Expand Up @@ -1302,6 +1303,8 @@ struct MDB_txn {
MDB_txn *mt_parent; /**< parent of a nested txn */
/** Nested txn under this txn, set together with flag #MDB_TXN_HAS_CHILD */
MDB_txn *mt_child;
/** The count of nested RDONLY txns under this txn also named child txns */
atomic_uint mt_rdonly_child_count;
pgno_t mt_next_pgno; /**< next unallocated page */
#ifdef MDB_VL32
pgno_t mt_last_pgno; /**< last written page */
Expand Down Expand Up @@ -3144,6 +3147,7 @@ mdb_txn_renew0(MDB_txn *txn)
mdb_debug = MDB_DBG_INFO;
#endif
txn->mt_child = NULL;
txn->mt_rdonly_child_count = 0;
txn->mt_loose_pgs = NULL;
txn->mt_loose_count = 0;
txn->mt_dirty_room = MDB_IDL_UM_MAX;
Expand Down Expand Up @@ -3220,9 +3224,15 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
return EACCES;

if (parent) {
/* Nested transactions: Max 1 child, write txns only, no writemap */
/* Nested transactions:
* If RDONLY: Any number of children, writemap allowed
* If write: Max 1 child, no writemap
*/
flags |= parent->mt_flags;
if (flags & (MDB_RDONLY|MDB_WRITEMAP|MDB_TXN_BLOCKED)) {
if (parent->mt_child && F_ISSET(parent->mt_child->mt_flags, MDB_RDONLY) && F_ISSET(flags, MDB_RDONLY)) {
flags &= ~MDB_TXN_HAS_CHILD;
}
if ((F_ISSET(flags, MDB_WRITEMAP) && !F_ISSET(flags, MDB_RDONLY)) || F_ISSET(flags, MDB_TXN_BLOCKED)) {
return (parent->mt_flags & MDB_TXN_RDONLY) ? EINVAL : MDB_BAD_TXN;
}
/* Child txns save MDB_pgstate and use own copy of cursors */
Expand Down Expand Up @@ -3263,6 +3273,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
unsigned int i;
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbiseqs = parent->mt_dbiseqs;
/* Not useful when nested RDONLY but correctly freed in mdb_txn_end */
txn->mt_u.dirty_list = malloc(sizeof(MDB_ID2)*MDB_IDL_UM_SIZE);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider skipping if MDB_RDONLY

if (!txn->mt_u.dirty_list ||
!(txn->mt_free_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX)))
Expand All @@ -3278,6 +3289,11 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
txn->mt_next_pgno = parent->mt_next_pgno;
parent->mt_flags |= MDB_TXN_HAS_CHILD;
parent->mt_child = txn;
if (flags & MDB_RDONLY) {
atomic_fetch_add(&parent->mt_rdonly_child_count, 1);
} else {
parent->mt_rdonly_child_count = 0;
}
txn->mt_parent = parent;
txn->mt_numdbs = parent->mt_numdbs;
#ifdef MDB_VL32
Expand All @@ -3290,7 +3306,8 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
rc = 0;
ntxn = (MDB_ntxn *)txn;
ntxn->mnt_pgstate = env->me_pgstate; /* save parent me_pghead & co */
if (env->me_pghead) {
/* Do not copy parent me_pghead when nested and RDONLY */
if (!(flags & MDB_RDONLY) && env->me_pghead) {
size = MDB_IDL_SIZEOF(env->me_pghead);
env->me_pghead = mdb_midl_alloc(env->me_pghead[0]);
if (env->me_pghead)
Expand Down Expand Up @@ -3377,6 +3394,7 @@ static void
mdb_txn_end(MDB_txn *txn, unsigned mode)
{
MDB_env *env = txn->mt_env;
unsigned int flags = txn->mt_flags;
#if MDB_DEBUG
static const char *const names[] = MDB_END_NAMES;
#endif
Expand All @@ -3389,7 +3407,7 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)
txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
(void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root));

if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
if (!txn->mt_parent && F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
if (txn->mt_u.reader) {
txn->mt_u.reader->mr_txnid = (txnid_t)-1;
if (!(env->me_flags & MDB_NOTLS)) {
Expand All @@ -3413,6 +3431,7 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)

txn->mt_numdbs = 0;
txn->mt_flags = MDB_TXN_FINISHED;
mdb_midl_free(txn->mt_spill_pgs);

if (!txn->mt_parent) {
mdb_midl_shrink(&txn->mt_free_pgs);
Expand All @@ -3428,15 +3447,19 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)
if (env->me_txns)
UNLOCK_MUTEX(env->me_wmutex);
} else {
txn->mt_parent->mt_child = NULL;
txn->mt_parent->mt_flags &= ~MDB_TXN_HAS_CHILD;
env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
if (!F_ISSET(flags, MDB_RDONLY) || atomic_fetch_sub(&txn->mt_parent->mt_rdonly_child_count, 1) == 1) {
txn->mt_parent->mt_child = NULL;
txn->mt_parent->mt_flags &= ~MDB_TXN_HAS_CHILD;
env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
}
mdb_midl_free(txn->mt_free_pgs);
free(txn->mt_u.dirty_list);
}
mdb_midl_free(txn->mt_spill_pgs);

mdb_midl_free(pghead);
/* A parent and RDONLY, it's a multi-nested RDONLY transaction case */
if (!(txn->mt_parent && flags & MDB_RDONLY)) {
mdb_midl_free(pghead);
}
}
#ifdef MDB_VL32
if (!txn->mt_parent) {
Expand Down Expand Up @@ -3486,6 +3509,11 @@ _mdb_txn_abort(MDB_txn *txn)
if (txn == NULL)
return;

if (txn->mt_parent && txn->mt_flags & MDB_RDONLY) {
// You must first abort the child before the parent
mdb_tassert(txn, txn->mt_parent && atomic_load(&txn->mt_rdonly_child_count) == 0);
}

if (txn->mt_child)
_mdb_txn_abort(txn->mt_child);

Expand Down Expand Up @@ -6480,7 +6508,7 @@ mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl)
MDB_page *p = NULL;
int level;

if (! (mc->mc_flags & (C_ORIG_RDONLY|C_WRITEMAP))) {
if (! (( mc->mc_flags & (C_ORIG_RDONLY|C_WRITEMAP) ) && mc->mc_txn->mt_parent == NULL)) {
MDB_txn *tx2 = txn;
level = 1;
do {
Expand Down Expand Up @@ -9628,7 +9656,7 @@ mdb_cursor_del0(MDB_cursor *mc)
goto fail;
}
if (m3->mc_xcursor && !(m3->mc_flags & C_EOF)) {
MDB_node *node = NODEPTR(m3->mc_pg[m3->mc_top], m3->mc_ki[m3->mc_top]);
MDB_node *node = NODEPTR(m3->mc_pg[mc->mc_top], m3->mc_ki[mc->mc_top]);
/* If this node has dupdata, it may need to be reinited
* because its data has moved.
* If the xcursor was not initd it must be reinited.
Expand Down