diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 0b69f43fa6..56042178e3 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -137,6 +137,7 @@ static NtCloseFunc *NtClose; #include #include #include +#include #ifdef _MSC_VER #include @@ -1302,6 +1303,8 @@ struct MDB_txn { MDB_txn *mt_parent; /**< parent of a nested txn */ /** Nested txn under this txn, set together with flag #MDB_TXN_HAS_CHILD */ MDB_txn *mt_child; + /** The count of nested RDONLY txns under this txn also named child txns */ + atomic_uint mt_rdonly_child_count; pgno_t mt_next_pgno; /**< next unallocated page */ #ifdef MDB_VL32 pgno_t mt_last_pgno; /**< last written page */ @@ -3144,6 +3147,7 @@ mdb_txn_renew0(MDB_txn *txn) mdb_debug = MDB_DBG_INFO; #endif txn->mt_child = NULL; + txn->mt_rdonly_child_count = 0; txn->mt_loose_pgs = NULL; txn->mt_loose_count = 0; txn->mt_dirty_room = MDB_IDL_UM_MAX; @@ -3220,9 +3224,15 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) return EACCES; if (parent) { - /* Nested transactions: Max 1 child, write txns only, no writemap */ + /* Nested transactions: + * If RDONLY: Any number of children, writemap allowed + * If write: Max 1 child, no writemap + */ flags |= parent->mt_flags; - if (flags & (MDB_RDONLY|MDB_WRITEMAP|MDB_TXN_BLOCKED)) { + if (parent->mt_child && F_ISSET(parent->mt_child->mt_flags, MDB_RDONLY) && F_ISSET(flags, MDB_RDONLY)) { + flags &= ~MDB_TXN_HAS_CHILD; + } + if ((F_ISSET(flags, MDB_WRITEMAP) && !F_ISSET(flags, MDB_RDONLY)) || F_ISSET(flags, MDB_TXN_BLOCKED)) { return (parent->mt_flags & MDB_TXN_RDONLY) ? EINVAL : MDB_BAD_TXN; } /* Child txns save MDB_pgstate and use own copy of cursors */ @@ -3263,6 +3273,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) unsigned int i; txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs); txn->mt_dbiseqs = parent->mt_dbiseqs; + /* Not useful when nested RDONLY but correctly freed in mdb_txn_end */ txn->mt_u.dirty_list = malloc(sizeof(MDB_ID2)*MDB_IDL_UM_SIZE); if (!txn->mt_u.dirty_list || !(txn->mt_free_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX))) @@ -3278,6 +3289,11 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) txn->mt_next_pgno = parent->mt_next_pgno; parent->mt_flags |= MDB_TXN_HAS_CHILD; parent->mt_child = txn; + if (flags & MDB_RDONLY) { + atomic_fetch_add(&parent->mt_rdonly_child_count, 1); + } else { + parent->mt_rdonly_child_count = 0; + } txn->mt_parent = parent; txn->mt_numdbs = parent->mt_numdbs; #ifdef MDB_VL32 @@ -3290,7 +3306,8 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) rc = 0; ntxn = (MDB_ntxn *)txn; ntxn->mnt_pgstate = env->me_pgstate; /* save parent me_pghead & co */ - if (env->me_pghead) { + /* Do not copy parent me_pghead when nested and RDONLY */ + if (!(flags & MDB_RDONLY) && env->me_pghead) { size = MDB_IDL_SIZEOF(env->me_pghead); env->me_pghead = mdb_midl_alloc(env->me_pghead[0]); if (env->me_pghead) @@ -3377,6 +3394,7 @@ static void mdb_txn_end(MDB_txn *txn, unsigned mode) { MDB_env *env = txn->mt_env; + unsigned int flags = txn->mt_flags; #if MDB_DEBUG static const char *const names[] = MDB_END_NAMES; #endif @@ -3389,7 +3407,7 @@ mdb_txn_end(MDB_txn *txn, unsigned mode) txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', (void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root)); - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { + if (!txn->mt_parent && F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { if (txn->mt_u.reader) { txn->mt_u.reader->mr_txnid = (txnid_t)-1; if (!(env->me_flags & MDB_NOTLS)) { @@ -3413,6 +3431,7 @@ mdb_txn_end(MDB_txn *txn, unsigned mode) txn->mt_numdbs = 0; txn->mt_flags = MDB_TXN_FINISHED; + mdb_midl_free(txn->mt_spill_pgs); if (!txn->mt_parent) { mdb_midl_shrink(&txn->mt_free_pgs); @@ -3428,15 +3447,19 @@ mdb_txn_end(MDB_txn *txn, unsigned mode) if (env->me_txns) UNLOCK_MUTEX(env->me_wmutex); } else { - txn->mt_parent->mt_child = NULL; - txn->mt_parent->mt_flags &= ~MDB_TXN_HAS_CHILD; - env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate; + if (!F_ISSET(flags, MDB_RDONLY) || atomic_fetch_sub(&txn->mt_parent->mt_rdonly_child_count, 1) == 1) { + txn->mt_parent->mt_child = NULL; + txn->mt_parent->mt_flags &= ~MDB_TXN_HAS_CHILD; + env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate; + } mdb_midl_free(txn->mt_free_pgs); free(txn->mt_u.dirty_list); } - mdb_midl_free(txn->mt_spill_pgs); - mdb_midl_free(pghead); + /* A parent and RDONLY, it's a multi-nested RDONLY transaction case */ + if (!(txn->mt_parent && flags & MDB_RDONLY)) { + mdb_midl_free(pghead); + } } #ifdef MDB_VL32 if (!txn->mt_parent) { @@ -3486,6 +3509,11 @@ _mdb_txn_abort(MDB_txn *txn) if (txn == NULL) return; + if (txn->mt_parent && txn->mt_flags & MDB_RDONLY) { + // You must first abort the child before the parent + mdb_tassert(txn, txn->mt_parent && atomic_load(&txn->mt_rdonly_child_count) == 0); + } + if (txn->mt_child) _mdb_txn_abort(txn->mt_child); @@ -6480,7 +6508,7 @@ mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl) MDB_page *p = NULL; int level; - if (! (mc->mc_flags & (C_ORIG_RDONLY|C_WRITEMAP))) { + if (! (( mc->mc_flags & (C_ORIG_RDONLY|C_WRITEMAP) ) && mc->mc_txn->mt_parent == NULL)) { MDB_txn *tx2 = txn; level = 1; do { @@ -9628,7 +9656,7 @@ mdb_cursor_del0(MDB_cursor *mc) goto fail; } if (m3->mc_xcursor && !(m3->mc_flags & C_EOF)) { - MDB_node *node = NODEPTR(m3->mc_pg[m3->mc_top], m3->mc_ki[m3->mc_top]); + MDB_node *node = NODEPTR(m3->mc_pg[mc->mc_top], m3->mc_ki[mc->mc_top]); /* If this node has dupdata, it may need to be reinited * because its data has moved. * If the xcursor was not initd it must be reinited.