diff --git a/core/model/src/main/java/org/eclipse/rdf4j/model/impl/LinkedHashModel.java b/core/model/src/main/java/org/eclipse/rdf4j/model/impl/LinkedHashModel.java index 8cc4f086ab..3b2f05071f 100644 --- a/core/model/src/main/java/org/eclipse/rdf4j/model/impl/LinkedHashModel.java +++ b/core/model/src/main/java/org/eclipse/rdf4j/model/impl/LinkedHashModel.java @@ -341,13 +341,13 @@ private static class ModelNode implements Serializable { private static final long serialVersionUID = -1205676084606998540L; - Set subjects = new LinkedHashSet<>(); + Set subjects = new LinkedHashSet<>(1); - Set predicates = new LinkedHashSet<>(); + Set predicates = new LinkedHashSet<>(1); - Set objects = new LinkedHashSet<>(); + Set objects = new LinkedHashSet<>(1); - Set contexts = new LinkedHashSet<>(); + Set contexts = new LinkedHashSet<>(1); private final V value; diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java index fd8b3a54b6..4178353173 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java @@ -11,10 +11,12 @@ package org.eclipse.rdf4j.sail.base; import java.lang.invoke.VarHandle; +import java.lang.ref.Reference; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; @@ -23,6 +25,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; @@ -51,9 +54,31 @@ @InternalUseOnly public abstract class Changeset implements SailSink, ModelFactory { - AdderBasedReadWriteLock readWriteLock = new AdderBasedReadWriteLock(); - AdderBasedReadWriteLock refBacksReadWriteLock = new AdderBasedReadWriteLock(); - Semaphore prependLock = new Semaphore(1); + static class CountedReference { + final T referent; + final AtomicInteger count = new AtomicInteger(1); + + CountedReference(T referent) { + this.referent = referent; + } + + CountedReference retain() { + count.incrementAndGet(); + return this; + } + + boolean release() { + return count.decrementAndGet() == 0; + } + + T get() { + return referent; + } + } + + final AdderBasedReadWriteLock readWriteLock = new AdderBasedReadWriteLock(); + final AdderBasedReadWriteLock refBacksReadWriteLock = new AdderBasedReadWriteLock(); + final Semaphore prependLock = new Semaphore(1); /** * Set of {@link SailDataset}s that are currently using this {@link Changeset} to derive the state of the @@ -78,7 +103,7 @@ public abstract class Changeset implements SailSink, ModelFactory { *

* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE */ - private volatile Model approved; + private volatile CountedReference approved; private volatile boolean approvedEmpty = true; /** @@ -86,7 +111,7 @@ public abstract class Changeset implements SailSink, ModelFactory { *

* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE */ - private volatile Model deprecated; + private volatile CountedReference deprecated; private volatile boolean deprecatedEmpty = true; /** @@ -127,12 +152,28 @@ public void close() throws SailException { refbacks = null; prepend = null; observed = null; - approved = null; - deprecated = null; approvedContexts = null; deprecatedContexts = null; addedNamespaces = null; removedPrefixes = null; + try { + if (approved != null && approved.release() && approved.get() instanceof AutoCloseable) { + ((AutoCloseable) approved.get()).close(); + } + } catch (Exception e) { + throw new SailException(e); + } finally { + approved = null; + if (deprecated != null && deprecated.release() && deprecated.get() instanceof AutoCloseable) { + try { + ((AutoCloseable) deprecated.get()).close(); + } catch (Exception e) { + throw new SailException(e); + } finally { + deprecated = null; + } + } + } } @Override @@ -168,7 +209,7 @@ boolean hasApproved(Resource subj, IRI pred, Value obj, Resource[] contexts) { boolean readLock = readWriteLock.readLock(); try { - return approved.contains(subj, pred, obj, contexts); + return approved.get().contains(subj, pred, obj, contexts); } finally { readWriteLock.unlockReader(readLock); } @@ -190,7 +231,7 @@ boolean hasDeprecated(Resource subj, IRI pred, Value obj, Resource[] contexts) { } } - return deprecated.contains(subj, pred, obj, contexts); + return deprecated.get().contains(subj, pred, obj, contexts); } finally { readWriteLock.unlockReader(readLock); } @@ -211,16 +252,17 @@ public void addRefback(SailDatasetImpl dataset) { } public void removeRefback(SailDatasetImpl dataset) { - assert !closed; - long writeLock = refBacksReadWriteLock.writeLock(); - try { - if (refbacks != null) { - refbacks.removeIf(d -> d == dataset); + if (refbacks != null) { + // assert !closed; + long writeLock = refBacksReadWriteLock.writeLock(); + try { + if (refbacks != null) { + refbacks.removeIf(d -> d == dataset); + } + } finally { + refBacksReadWriteLock.unlockWriter(writeLock); } - } finally { - refBacksReadWriteLock.unlockWriter(writeLock); } - } public boolean isRefback() { @@ -373,7 +415,7 @@ public void clear(Resource... contexts) { statementCleared = true; if (approved != null) { - approved.clear(); + approved.get().clear(); } if (approvedContexts != null) { approvedContexts.clear(); @@ -383,7 +425,7 @@ public void clear(Resource... contexts) { deprecatedContexts = new HashSet<>(); } if (approved != null) { - approved.remove(null, null, null, contexts); + approved.get().remove(null, null, null, contexts); } if (approvedContexts != null && contexts != null) { for (Resource resource : contexts) { @@ -394,7 +436,7 @@ public void clear(Resource... contexts) { deprecatedContexts.addAll(Arrays.asList(contexts)); } } - approvedEmpty = approved == null || approved.isEmpty(); + approvedEmpty = approved == null || approved.get().isEmpty(); } finally { readWriteLock.unlockWriter(writeLock); } @@ -409,14 +451,14 @@ public void approve(Statement statement) { try { if (deprecated != null) { - deprecated.remove(statement); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecated.get().remove(statement); + deprecatedEmpty = deprecated == null || deprecated.get().isEmpty(); } if (approved == null) { - approved = createEmptyModel(); + approved = new CountedReference<>(createEmptyModel()); } - approved.add(statement); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().add(statement); + approvedEmpty = false; if (statement.getContext() != null) { if (approvedContexts == null) { approvedContexts = new HashSet<>(); @@ -440,17 +482,17 @@ public void deprecate(Statement statement) { long writeLock = readWriteLock.writeLock(); try { if (approved != null) { - approved.remove(statement); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().remove(statement); + approvedEmpty = approved == null || approved.get().isEmpty(); } if (deprecated == null) { - deprecated = createEmptyModel(); + deprecated = new CountedReference<>(createEmptyModel()); } - deprecated.add(statement); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecated.get().add(statement); + deprecatedEmpty = false; Resource ctx = statement.getContext(); if (approvedContexts != null && approvedContexts.contains(ctx) - && !approved.contains(null, null, null, ctx)) { + && !approved.get().contains(null, null, null, ctx)) { approvedContexts.remove(ctx); } } finally { @@ -485,11 +527,11 @@ public String toString() { sb.append(" deprecatedContexts, "); } if (deprecated != null) { - sb.append(deprecated.size()); + sb.append(deprecated.get().size()); sb.append(" deprecated, "); } if (approved != null) { - sb.append(approved.size()); + sb.append(approved.get().size()); sb.append(" approved, "); } if (sb.length() > 0) { @@ -504,9 +546,9 @@ protected void setChangeset(Changeset from) { assert !from.closed; this.observed = from.observed; - this.approved = from.approved; + this.approved = from.approved != null ? from.approved.retain() : null; this.approvedEmpty = from.approvedEmpty; - this.deprecated = from.deprecated; + this.deprecated = from.deprecated != null ? from.deprecated.retain() : null; this.deprecatedEmpty = from.deprecatedEmpty; this.approvedContexts = from.approvedContexts; this.deprecatedContexts = from.deprecatedContexts; @@ -673,7 +715,7 @@ List getDeprecatedStatements() { boolean readLock = readWriteLock.readLock(); try { - return new ArrayList<>(deprecated); + return new ArrayList<>(deprecated.get()); } finally { readWriteLock.unlockReader(readLock); } @@ -688,7 +730,7 @@ List getApprovedStatements() { boolean readLock = readWriteLock.readLock(); try { - return new ArrayList<>(approved); + return new ArrayList<>(approved.get()); } finally { readWriteLock.unlockReader(readLock); } @@ -709,7 +751,7 @@ boolean hasDeprecated(Statement statement) { } } if (deprecated != null) { - return deprecated.contains(statement); + return deprecated.get().contains(statement); } else { return false; } @@ -735,7 +777,7 @@ Iterable getApprovedStatements(Resource subj, IRI pred, Value obj, boolean readLock = readWriteLock.readLock(); try { - Iterable statements = approved.getStatements(subj, pred, obj, contexts); + Iterable statements = approved.get().getStatements(subj, pred, obj, contexts); // This is a synchronized context, users of this method will be allowed to use the results at their leisure. // We @@ -772,7 +814,8 @@ Iterable getApprovedTriples(Resource subj, IRI pred, Value obj) { try { // TODO none of this is particularly well thought-out in terms of performance, but we are aiming // for functionally complete first. - Stream approvedSubjectTriples = approved.parallelStream() + Stream approvedSubjectTriples = approved.get() + .parallelStream() .filter(st -> st.getSubject().isTriple()) .map(st -> (Triple) st.getSubject()) .filter(t -> { @@ -785,7 +828,8 @@ Iterable getApprovedTriples(Resource subj, IRI pred, Value obj) { return obj == null || obj.equals(t.getObject()); }); - Stream approvedObjectTriples = approved.parallelStream() + Stream approvedObjectTriples = approved.get() + .parallelStream() .filter(st -> st.getObject().isTriple()) .map(st -> (Triple) st.getObject()) .filter(t -> { @@ -806,11 +850,31 @@ Iterable getApprovedTriples(Resource subj, IRI pred, Value obj) { void removeApproved(Statement next) { assert !closed; + + try { + CountedReference localApproved = approved; + if (localApproved != null && !readWriteLock.writeLock.isWriteLocked() + && !localApproved.get().contains(next)) { + boolean readLock = readWriteLock.readLock(); + try { + if (approved != null && !approvedEmpty) { + if (!approved.get().contains(next)) { + return; + } + } + } finally { + readWriteLock.unlockReader(readLock); + } + } + + } catch (ConcurrentModificationException ignored) { + } + long writeLock = readWriteLock.writeLock(); try { if (approved != null) { - approved.remove(next); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().remove(next); + approvedEmpty = approved == null || approved.get().isEmpty(); } } finally { readWriteLock.unlockWriter(writeLock); @@ -834,7 +898,7 @@ void sinkApproved(SailSink sink) { boolean readLock = readWriteLock.readLock(); try { if (approved != null) { - sink.approveAll(approved, approvedContexts); + sink.approveAll(approved.get(), approvedContexts); } } finally { readWriteLock.unlockReader(readLock); @@ -849,7 +913,7 @@ void sinkDeprecated(SailSink sink) { boolean readLock = readWriteLock.readLock(); try { if (deprecated != null) { - sink.deprecateAll(deprecated); + sink.deprecateAll(deprecated.get()); } } finally { readWriteLock.unlockReader(readLock); @@ -879,13 +943,13 @@ public void approveAll(Set approve, Set approveContexts) { try { if (deprecated != null) { - deprecated.removeAll(approve); + deprecated.get().removeAll(approve); } if (approved == null) { - approved = createEmptyModel(); + approved = new CountedReference<>(createEmptyModel()); } - approved.addAll(approve); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().addAll(approve); + approvedEmpty = approvedEmpty && approve.isEmpty(); if (approveContexts != null) { if (approvedContexts == null) { @@ -905,19 +969,19 @@ public void deprecateAll(Set deprecate) { try { if (approved != null) { - approved.removeAll(deprecate); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().removeAll(deprecate); + approvedEmpty = approved == null || approved.get().isEmpty(); } if (deprecated == null) { - deprecated = createEmptyModel(); + deprecated = new CountedReference<>(createEmptyModel()); } - deprecated.addAll(deprecate); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecated.get().addAll(deprecate); + deprecatedEmpty = deprecatedEmpty && deprecate.isEmpty(); for (Statement statement : deprecate) { Resource ctx = statement.getContext(); if (approvedContexts != null && approvedContexts.contains(ctx) - && !approved.contains(null, null, null, ctx)) { + && !approved.get().contains(null, null, null, ctx)) { approvedContexts.remove(ctx); } } diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java index 15a38e5427..e2ac9ee0b3 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java @@ -349,7 +349,7 @@ void merge(Changeset change) { // the ´semaphore´. Synchronizing on the ´pending´ collection could potentially lead to a deadlock when // closing a Changeset during rollback. for (Changeset c : pending) { - c.prepend(merged); + c.prepend(merged.shallowClone()); } } } finally { @@ -477,7 +477,10 @@ private void flush(SailSink sink) throws SailException { && !isChanged((Changeset) sink)) { // one change to apply that is not in use to an empty Changeset Changeset dst = (Changeset) sink; - dst.setChangeset(changes.pop()); + Changeset src = changes.pop(); + dst.setChangeset(src); + // correctly close changeset + src.close(); } else { Iterator iter = changes.iterator(); while (iter.hasNext()) { @@ -517,6 +520,9 @@ private void flush(Changeset change, SailSink sink) throws SailException { change.sinkDeprecated(sink); change.sinkApproved(sink); + + // correctly close changeset + change.close(); } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index e0ea0ab7c0..b632bf792b 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -16,10 +16,7 @@ import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -75,7 +72,8 @@ class LmdbSailStore implements SailStore { private volatile boolean asyncTransactionFinished; private volatile boolean nextTransactionAsync; - private final boolean enableMultiThreading = true; + boolean enableMultiThreading = true; + boolean enableGc = true; private PersistentSetFactory setFactory; private PersistentSet unusedIds, nextUnusedIds; @@ -406,6 +404,10 @@ CloseableIteration createStatementIterator( } } + public void setTransactionIsolation(boolean transactionIsolation) { + this.tripleStore.setTransactionIsolation(transactionIsolation); + } + private final class LmdbSailSource extends BackingSailSource { private final boolean explicit; @@ -573,6 +575,51 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai addStatement(subj, pred, obj, explicit, ctx); } + @Override + public void approveAll(Set approved, Set approvedContexts) { + + sinkStoreAccessLock.lock(); + try { + startTransaction(true); + + for (Statement statement : approved) { + Resource subj = statement.getSubject(); + IRI pred = statement.getPredicate(); + Value obj = statement.getObject(); + Resource context = statement.getContext(); + + AddQuadOperation q = new AddQuadOperation(); + q.s = valueStore.storeValue(subj); + q.p = valueStore.storeValue(pred); + q.o = valueStore.storeValue(obj); + q.c = context == null ? 0 : valueStore.storeValue(context); + q.context = context; + q.explicit = explicit; + + if (multiThreadingActive) { + while (!opQueue.add(q)) { + if (tripleStoreException != null) { + throw wrapTripleStoreException(); + } + } + + } else { + q.execute(); + } + + } + } catch (IOException e) { + rollback(); + throw new SailException(e); + } catch (RuntimeException e) { + rollback(); + logger.error("Encountered an unexpected problem while trying to add a statement", e); + throw e; + } finally { + sinkStoreAccessLock.unlock(); + } + } + @Override public void deprecate(Statement statement) throws SailException { removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit, @@ -714,9 +761,11 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit, for (long contextId : contexts) { tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> { removeCount[0]++; - for (long id : quad) { - if (id != 0L) { - unusedIds.add(id); + if (enableGc) { + for (long id : quad) { + if (id != 0L) { + unusedIds.add(id); + } } } }); diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java index 13b88a9294..d64c5ffbe0 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java @@ -251,11 +251,17 @@ protected void initializeInternal() throws SailException { FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8); } backingStore = new LmdbSailStore(dataDir, config); - this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() { + this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) { @Override - protected SailStore createSailStore(File dataDir) throws IOException, SailException { + protected LmdbSailStore createSailStore(File dataDir) throws IOException, SailException { // Model can't fit into memory, use another LmdbSailStore to store delta - return new LmdbSailStore(dataDir, config); + LmdbStoreConfig overflowConfig = new LmdbStoreConfig(); + LmdbSailStore store = new LmdbSailStore(dataDir, overflowConfig); + store.enableMultiThreading = false; + store.enableGc = false; + // does not need to isolate transactions and therefore can optimize autogrow and others + store.setTransactionIsolation(false); + return store; } }) { @@ -406,4 +412,5 @@ private boolean upgradeStore(File dataDir, String version) throws SailException public Supplier getCollectionFactory() { return () -> new MapDb3CollectionFactory(getIterationCacheSyncThreshold()); } + } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java index f13cca4493..58b42c849f 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java @@ -8,17 +8,27 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ + package org.eclipse.rdf4j.sail.lmdb; import java.io.File; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; import java.nio.file.Files; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.management.NotificationEmitter; +import javax.management.openmbean.CompositeData; import org.eclipse.rdf4j.common.io.FileUtil; import org.eclipse.rdf4j.model.IRI; @@ -32,58 +42,105 @@ import org.eclipse.rdf4j.model.impl.LinkedHashModel; import org.eclipse.rdf4j.model.impl.SimpleValueFactory; import org.eclipse.rdf4j.sail.SailException; -import org.eclipse.rdf4j.sail.base.SailStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.management.GarbageCollectionNotificationInfo; +import com.sun.management.GcInfo; + /** * Model implementation that stores in a {@link LinkedHashModel} until more than 10KB statements are added and the * estimated memory usage is more than the amount of free memory available. Once the threshold is cross this * implementation seamlessly changes to a disk based {@link SailSourceModel}. */ -abstract class MemoryOverflowModel extends AbstractModel { +abstract class MemoryOverflowModel extends AbstractModel implements AutoCloseable { private static final long serialVersionUID = 4119844228099208169L; private static final Runtime RUNTIME = Runtime.getRuntime(); - private static final int LARGE_BLOCK = 10000; + private static final int LARGE_BLOCK = 5 * 1024; // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB - private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024; + // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. + private static final long MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = Math.max(32 * 1024 * 1024, (int) (RUNTIME.maxMemory() * 0.15)); final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); - private LinkedHashModel memory; + private volatile LinkedHashModel memory; + + private transient File dataDir; - transient File dataDir; + private transient LmdbSailStore store; - transient SailStore store; + private transient volatile SailSourceModel disk; - transient SailSourceModel disk; + private final boolean verifyAdditions; - private long baseline = 0; + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); - private long maxBlockSize = 0; + private static volatile boolean overflow = false; + private static volatile long lastGcUpdate; + private static volatile long gcSum; + private static volatile ConcurrentLinkedQueue gcInfos = new ConcurrentLinkedQueue<>(); + static { + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + NotificationEmitter emitter = (NotificationEmitter) gcBean; + emitter.addNotificationListener((notification, o) -> { + long uptimeInMillis = runtimeMXBean.getUptime(); + while (! gcInfos.isEmpty()) { + if (uptimeInMillis - gcInfos.peek().getEndTime() > 5000) { + gcSum -= gcInfos.poll().getDuration(); + } else { + break; + } + } - SimpleValueFactory vf = SimpleValueFactory.getInstance(); + // extract garbage collection information from notification. + GarbageCollectionNotificationInfo gcNotificationInfo = GarbageCollectionNotificationInfo.from((CompositeData) notification.getUserData()); + GcInfo gcInfo = gcNotificationInfo.getGcInfo(); + gcInfos.add(gcInfo); + gcSum += gcInfo.getDuration(); + if (gcSum > 2500) { + if (! overflow) { + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); - public MemoryOverflowModel() { - memory = new LinkedHashModel(LARGE_BLOCK); - } + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); + // estimated memory used + long used = totalMemory - freeMemory; + + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; + + // try to prevent OOM + overflow = freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING; + + if (overflow) { + System.out.println("overflow due to high gc load and mem consumption: sum=" + gcSum + " count=" + gcInfos.size()); + } + } + lastGcUpdate = System.currentTimeMillis(); + } else if (System.currentTimeMillis() - lastGcUpdate > 10000) { + overflow = false; + } + }, null, null); + } } - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); + public MemoryOverflowModel(boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; + memory = new LinkedHashModel(LARGE_BLOCK); } - public MemoryOverflowModel(Set namespaces) { + public MemoryOverflowModel(Set namespaces, boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(namespaces, LARGE_BLOCK); } @@ -137,6 +194,33 @@ public boolean add(Statement st) { return getDelegate().add(st); } + @Override + public boolean addAll(Collection c) { + checkMemoryOverflow(); + if (disk != null || c.size() <= 1024) { + return getDelegate().addAll(c); + } else { + boolean ret = false; + HashSet buffer = new HashSet<>(); + for (Statement st : c) { + buffer.add(st); + if (buffer.size() >= 1024) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + checkMemoryOverflow(); + } + } + if (!buffer.isEmpty()) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + } + + return ret; + + } + + } + @Override public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { return getDelegate().remove(subj, pred, obj, contexts); @@ -191,13 +275,27 @@ public synchronized void removeTermIteration(Iterator iter, Resource } } - protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException; + protected abstract LmdbSailStore createSailStore(File dataDir) throws IOException, SailException; - synchronized Model getDelegate() { - if (disk == null) { + private Model getDelegate() { + var memory = this.memory; + if (memory != null) { return memory; + } else { + var disk = this.disk; + if (disk != null) { + return disk; + } + synchronized (this) { + if (this.memory != null) { + return this.memory; + } + if (this.disk != null) { + return this.disk; + } + throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state"); + } } - return disk; } private void writeObject(ObjectOutputStream s) throws IOException { @@ -228,75 +326,60 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx } private synchronized void checkMemoryOverflow() { - if (disk == null) { - int size = size(); - if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); - - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); - - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); - - // estimated memory used - long used = totalMemory - freeMemory; - - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; - - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } + if (disk == getDelegate()) { + return; + } - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); - overflowToDisk(); - } - } - baseline = used; - } + if (overflow) { + logger.debug("syncing at {} triples due to gc load", size()); + overflowToDisk(); + System.gc(); } } private synchronized void overflowToDisk() { + overflow = true; + if (memory == null) { + assert disk != null; + return; + } + try { + LinkedHashModel memory = this.memory; + this.memory = null; + assert disk == null; dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store) { - - @Override - protected void finalize() throws Throwable { - logger.debug("finalizing {}", dataDir); - if (disk == this) { - try { - store.close(); - } catch (SailException e) { - logger.error(e.toString(), e); - } finally { - FileUtil.deleteDir(dataDir); - dataDir = null; - store = null; - disk = null; - } - } - super.finalize(); - } - }; - disk.addAll(memory); - memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK); + disk = new SailSourceModel(store, memory, verifyAdditions); logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; logger.error("Error while writing to overflow directory " + path, e); } } + + @Override + public void close() throws IOException { + if (disk != null) { + logger.debug("closing {}", dataDir); + try { + disk.close(); + } catch (Exception e) { + logger.error(e.toString(), e); + } finally { + try { + store.close(); + } catch (SailException e) { + logger.error(e.toString(), e); + } finally { + FileUtil.deleteDir(dataDir); + dataDir = null; + store = null; + disk = null; + } + } + } + } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java index 913fefd6ae..6fa4388b57 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java @@ -11,6 +11,8 @@ package org.eclipse.rdf4j.sail.lmdb; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -45,6 +47,8 @@ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); + private final boolean verifyAdditions; + private final class StatementIterator implements Iterator { final CloseableIteration stmts; @@ -97,16 +101,21 @@ public void remove() { SailSink sink; - private long size; - private final IsolationLevels level = IsolationLevels.NONE; - public SailSourceModel(SailStore store) { - this(store.getExplicitSailSource()); + public SailSourceModel(SailStore store, Model bulk, boolean verifyAdditions) { + this(store, verifyAdditions); + sink().approveAll(bulk, bulk.contexts()); + sink.flush(); + } + + public SailSourceModel(SailStore store, boolean verifyAdditions) { + this(store.getExplicitSailSource(), verifyAdditions); } - public SailSourceModel(SailSource source) { + public SailSourceModel(SailSource source, boolean verifyAdditions) { this.source = source; + this.verifyAdditions = verifyAdditions; } @Override @@ -147,21 +156,19 @@ public String toString() { @Override public synchronized int size() { - if (size < 0) { + long size = 0; + try { + CloseableIteration iter = dataset().getStatements(null, null, null); try { - CloseableIteration iter; - iter = dataset().getStatements(null, null, null); - try { - while (iter.hasNext()) { - iter.next(); - size++; - } - } finally { - iter.close(); + while (iter.hasNext()) { + iter.next(); + size++; } - } catch (SailException e) { - throw new ModelException(e); + } finally { + iter.close(); } + } catch (SailException e) { + throw new ModelException(e); } if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -243,13 +250,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... throw new UnsupportedOperationException("Incomplete statement"); } try { - if (contains(subj, pred, obj, contexts)) { + if (verifyAdditions && contains(subj, pred, obj, contexts)) { logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts); return false; } - if (size >= 0) { - size++; - } if (contexts == null || contexts.length == 0) { sink().approve(subj, pred, obj, null); } else { @@ -263,12 +267,51 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... } } + @Override + public boolean addAll(Collection statements) { + if (statements.isEmpty()) { + return false; + } + + if (statements.size() == 1) { + Statement st = statements.iterator().next(); + return add(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext()); + } + + boolean added = false; + + HashSet tempSet = new HashSet<>(); + HashSet contexts = new HashSet<>(); + SailSink sink = sink(); + + for (Statement statement : statements) { + if (tempSet.size() >= 1024) { + sink.approveAll(tempSet, contexts); + tempSet.clear(); + contexts.clear(); + added = true; + } + if (!contains(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext())) { + contexts.add(statement.getContext()); + tempSet.add(statement); + } + } + + if (!tempSet.isEmpty()) { + sink.approveAll(tempSet, contexts); + added = true; + } + + return added; + + } + @Override public synchronized boolean clear(Resource... contexts) { try { if (contains(null, null, null, contexts)) { sink().clear(contexts); - size = -1; return true; } } catch (SailException e) { @@ -279,25 +322,23 @@ public synchronized boolean clear(Resource... contexts) { @Override public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + boolean removed = false; try { - if (contains(subj, pred, obj, contexts)) { - size = -1; - CloseableIteration stmts; - stmts = dataset().getStatements(subj, pred, obj, contexts); - try { - while (stmts.hasNext()) { - Statement st = stmts.next(); - sink().deprecate(st); - } - } finally { - stmts.close(); + CloseableIteration stmts = dataset().getStatements(subj, pred, obj, + contexts); + try { + while (stmts.hasNext()) { + Statement st = stmts.next(); + sink().deprecate(st); + removed = true; } - return true; + } finally { + stmts.close(); } } catch (SailException e) { throw new ModelException(e); } - return false; + return removed; } @Override @@ -372,7 +413,6 @@ public synchronized void removeTermIteration(Iterator iter, Resource } finally { stmts.close(); } - size = -1; } catch (SailException e) { throw new ModelException(e); } @@ -404,6 +444,21 @@ private synchronized SailDataset dataset() throws SailException { return dataset; } + public void close() { + if (sink != null) { + try { + sink.flush(); + } finally { + sink.close(); + sink = null; + } + } + if (dataset != null) { + dataset.close(); + dataset = null; + } + } + private boolean contains(SailDataset dataset, Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { if (dataset == null) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index c352415fb3..d43efb58df 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -15,7 +15,6 @@ import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction; import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction; import static org.eclipse.rdf4j.sail.lmdb.Varint.readListUnsigned; -import static org.eclipse.rdf4j.sail.lmdb.Varint.writeListUnsigned; import static org.eclipse.rdf4j.sail.lmdb.Varint.writeUnsigned; import static org.lwjgl.system.MemoryStack.stackPush; import static org.lwjgl.system.MemoryUtil.NULL; @@ -49,7 +48,6 @@ import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxreaders; import static org.lwjgl.util.lmdb.LMDB.mdb_get; import static org.lwjgl.util.lmdb.LMDB.mdb_put; -import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare; import static org.lwjgl.util.lmdb.LMDB.mdb_stat; import static org.lwjgl.util.lmdb.LMDB.mdb_strerror; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort; @@ -89,7 +87,6 @@ import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; import org.lwjgl.PointerBuffer; import org.lwjgl.system.MemoryStack; -import org.lwjgl.util.lmdb.MDBCmpFuncI; import org.lwjgl.util.lmdb.MDBEnvInfo; import org.lwjgl.util.lmdb.MDBStat; import org.lwjgl.util.lmdb.MDBVal; @@ -169,6 +166,8 @@ class TripleStore implements Closeable { private TxnRecordCache recordCache = null; + private boolean transactionIsolation = true; + static final Comparator COMPARATOR = new Comparator() { @Override public int compare(ByteBuffer b1, ByteBuffer b2) { @@ -838,9 +837,35 @@ protected TripleIndex getBestIndex(long subj, long pred, long obj, long context) return bestIndex; } - private boolean requiresResize() { - if (autoGrow) { - return LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0); + private boolean requiresResize() throws IOException { + if (autoGrow && LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) { + if (transactionIsolation) { + // caller has to handle resizing + return true; + } else { + // directly resize if isolation is not required + E(mdb_txn_commit(writeTxn)); + StampedLock lock = txnManager.lock(); + long stamp = lock.writeLock(); + try { + txnManager.deactivate(); + mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0); + E(mdb_env_set_mapsize(env, mapSize)); + // restart write transaction + try (MemoryStack stack = stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + mdb_txn_begin(env, NULL, 0, pp); + writeTxn = pp.get(0); + } + } finally { + try { + txnManager.activate(); + } finally { + lock.unlockWrite(stamp); + } + } + return false; + } } else { return false; } @@ -1156,6 +1181,10 @@ private void storeProperties(File propFile) throws IOException { } } + public void setTransactionIsolation(boolean transactionIsolation) { + this.transactionIsolation = transactionIsolation; + } + class TripleIndex { private final char[] fieldSeq; diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java index a54338ebdf..c25c6f12b2 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java @@ -49,7 +49,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -900,37 +899,41 @@ public long getId(Value value, boolean create) throws IOException { public void gcIds(Collection ids, Collection nextIds) throws IOException { if (!ids.isEmpty()) { - // contains IDs for data types and namespaces which are freed by garbage collecting literals and URIs - resizeMap(writeTxn, 2 * ids.size() * (1 + Long.BYTES + 2 + Long.BYTES)); - - final Collection finalIds = ids; - final Collection finalNextIds = nextIds; - writeTransaction((stack, writeTxn) -> { - MDBVal revIdVal = MDBVal.calloc(stack); - MDBVal idVal = MDBVal.calloc(stack); - MDBVal dataVal = MDBVal.calloc(stack); - - ByteBuffer revIdBb = stack.malloc(1 + Long.BYTES + 2 + Long.BYTES); - Varint.writeUnsigned(revIdBb, revision.getRevisionId()); - int revLength = revIdBb.position(); - for (Long id : finalIds) { - revIdBb.position(revLength).limit(revIdBb.capacity()); - revIdVal.mv_data(id2data(revIdBb, id).flip()); - // check if id has internal references and therefore cannot be deleted - idVal.mv_data(revIdBb.slice().position(revLength)); - if (mdb_get(writeTxn, refCountsDbi, idVal, dataVal) == 0) { - continue; + // wrap into read txn as resizeMap expects an active surrounding read txn + readTransaction(env, (stack1, txn1) -> { + // contains IDs for data types and namespaces which are freed by garbage collecting literals and URIs + resizeMap(writeTxn, 2 * ids.size() * (1 + Long.BYTES + 2 + Long.BYTES)); + + final Collection finalIds = ids; + final Collection finalNextIds = nextIds; + writeTransaction((stack, writeTxn) -> { + MDBVal revIdVal = MDBVal.calloc(stack); + MDBVal idVal = MDBVal.calloc(stack); + MDBVal dataVal = MDBVal.calloc(stack); + + ByteBuffer revIdBb = stack.malloc(1 + Long.BYTES + 2 + Long.BYTES); + Varint.writeUnsigned(revIdBb, revision.getRevisionId()); + int revLength = revIdBb.position(); + for (Long id : finalIds) { + revIdBb.position(revLength).limit(revIdBb.capacity()); + revIdVal.mv_data(id2data(revIdBb, id).flip()); + // check if id has internal references and therefore cannot be deleted + idVal.mv_data(revIdBb.slice().position(revLength)); + if (mdb_get(writeTxn, refCountsDbi, idVal, dataVal) == 0) { + continue; + } + // mark id as unused + E(mdb_put(writeTxn, unusedDbi, revIdVal, dataVal, 0)); } - // mark id as unused - E(mdb_put(writeTxn, unusedDbi, revIdVal, dataVal, 0)); - } - deleteValueToIdMappings(stack, writeTxn, finalIds, finalNextIds); + deleteValueToIdMappings(stack, writeTxn, finalIds, finalNextIds); - invalidateRevisionOnCommit = true; - if (nextValueEvictionTime < 0) { - nextValueEvictionTime = System.currentTimeMillis() + VALUE_EVICTION_INTERVAL; - } + invalidateRevisionOnCommit = true; + if (nextValueEvictionTime < 0) { + nextValueEvictionTime = System.currentTimeMillis() + VALUE_EVICTION_INTERVAL; + } + return null; + }); return null; }); } diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java index 01ddf213bf..e621259b03 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java @@ -61,7 +61,7 @@ protected SailSourceModel getNewModel() { LmdbSailStore store = new LmdbSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(), new LmdbStoreConfig("spoc")); stores.add(store); - return new SailSourceModel(store); + return new SailSourceModel(store, false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java new file mode 100644 index 0000000000..419ff93cc3 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java @@ -0,0 +1,240 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ + +package org.eclipse.rdf4j.sail.lmdb.benchmark; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.sail.NotifyingSail; +import org.eclipse.rdf4j.sail.NotifyingSailConnection; +import org.eclipse.rdf4j.sail.SailConnectionListener; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailWrapper; +import org.eclipse.rdf4j.sail.lmdb.LmdbStore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Logger; + +/** + * @author Håvard Ottestad + */ +@State(Scope.Benchmark) +@Warmup(iterations = 5) +@BenchmarkMode({ Mode.AverageTime }) +@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" }) +@Measurement(iterations = 5, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class OverflowBenchmarkConcurrent { + + @Setup(Level.Trial) + public void setup() { + ((Logger) (LoggerFactory + .getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel"))) + .setLevel(ch.qos.logback.classic.Level.DEBUG); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include("OverflowBenchmarkConcurrent") // adapt to run other benchmark tests + .build(); + + new Runner(opt).run(); + } + + @Benchmark + public void manyConcurrentTransactions() throws IOException { + File temporaryFolder = Files.newTemporaryFolder(); + SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper( + new NotifySailWrapper( + new NotifySailWrapper(new NotifySailWrapper(new LmdbStore(temporaryFolder))))))); + ExecutorService executorService = Executors.newFixedThreadPool(10); + + try { + + Model parse; + try (InputStream resourceAsStream = getClass().getClassLoader() + .getResourceAsStream("benchmarkFiles/datagovbe-valid.ttl")) { + parse = Rio.parse(resourceAsStream, RDFFormat.TURTLE); + } + + List> futureList = new ArrayList<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + for (int i = 0; i < 38; i++) { + var seed = i + 485924; + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + + int addSize = new Random(seed).nextInt(parse.size()); + IRI context = Values.iri("http://example.org/" + new Random(seed + 1).nextInt(10)); + List collect = parse.stream() + .skip(addSize) + .limit(10_000) + .map(s -> SimpleValueFactory.getInstance() + .createStatement(s.getSubject(), s.getPredicate(), s.getObject(), context)) + .collect(Collectors.toList()); + StringWriter stringWriter = new StringWriter(); + Rio.write(collect, stringWriter, RDFFormat.TRIG); + String string = stringWriter.toString(); + + connection.prepareUpdate("INSERT DATA { GRAPH " + string + " }").execute(); + + System.out.println("Added"); + } + }); + futureList.add(submit); + } + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + System.out.println("Waiting"); + long l = System.currentTimeMillis(); + while (!connection.isEmpty()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + if (System.currentTimeMillis() - l > 1000) { + break; + } + } + System.out.println("Removing"); + connection.begin(); + try (RepositoryResult statements = connection.getStatements(null, null, null)) { + statements.stream().limit(10_000).forEach(connection::remove); + } + connection.commit(); + + System.out.println("Removed"); + } + }); + futureList.add(submit); + } + } + + countDownLatch.countDown(); + + for (int i = 0; i < futureList.size(); i++) { + Future future = futureList.get(i); + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("Done: " + i); + } + + } finally { + try { + executorService.shutdownNow(); + } finally { + try { + sailRepository.shutDown(); + } finally { + FileUtils.deleteDirectory(temporaryFolder); + } + } + } + + } + + static class NotifySailWrapper extends NotifyingSailWrapper { + + public NotifySailWrapper(NotifyingSail baseSail) { + super(baseSail); + } + + @Override + public NotifyingSailConnection getConnection() throws SailException { + return new Connection(super.getConnection()); + } + } + + static class Connection extends NotifyingSailConnectionWrapper implements SailConnectionListener { + + Set addedStatements = new HashSet<>(); + Set removedStatements = new HashSet<>(); + + public Connection(NotifyingSailConnection wrappedCon) { + super(wrappedCon); + addConnectionListener(this); + } + + @Override + public void statementAdded(Statement st) { + removedStatements.remove(st); + addedStatements.add(st); + } + + @Override + public void statementRemoved(Statement st) { + addedStatements.remove(st); + removedStatements.add(st); + } + + } + +} diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkReal.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkReal.java index 2060dfca67..05bee01986 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkReal.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkReal.java @@ -44,10 +44,10 @@ * Benchmarks transaction isolation and overflow performance with real data. */ @State(Scope.Benchmark) -@Warmup(iterations = 0) +@Warmup(iterations = 5) @BenchmarkMode({ Mode.AverageTime }) @Fork(value = 1, jvmArgs = { "-Xms500M", "-Xmx500M", "-XX:+UseParallelGC" }) -@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class OverflowBenchmarkReal { diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java index 0ccf1dcafc..adc39e6704 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java @@ -50,10 +50,10 @@ * Benchmarks transaction isolation and overflow performance with synthetic data. */ @State(Scope.Benchmark) -@Warmup(iterations = 0) +@Warmup(iterations = 5) @BenchmarkMode({ Mode.AverageTime }) @Fork(value = 1, jvmArgs = { "-Xms64M", "-Xmx64M", "-XX:+UseG1GC" }) -@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class OverflowBenchmarkSynthetic { @@ -63,7 +63,7 @@ public class OverflowBenchmarkSynthetic { @Setup(Level.Trial) public void setup() { ((Logger) (LoggerFactory - .getLogger("org.eclipse.rdf4j.sail.lmdbrdf.MemoryOverflowModel"))) + .getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel"))) .setLevel(ch.qos.logback.classic.Level.DEBUG); } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java index 001e25de58..92a4fbfb38 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others. + * Copyright (c) 2021 Eclipse RDF4J contributors. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Distribution License v1.0 @@ -8,18 +8,26 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ + package org.eclipse.rdf4j.sail.nativerdf; import java.io.File; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; import java.nio.file.Files; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.eclipse.rdf4j.common.io.FileUtil; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Namespace; @@ -31,7 +39,6 @@ import org.eclipse.rdf4j.model.impl.LinkedHashModel; import org.eclipse.rdf4j.model.impl.SimpleValueFactory; import org.eclipse.rdf4j.sail.SailException; -import org.eclipse.rdf4j.sail.base.SailStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,20 +46,21 @@ * Model implementation that stores in a {@link LinkedHashModel} until more than 10KB statements are added and the * estimated memory usage is more than the amount of free memory available. Once the threshold is cross this * implementation seamlessly changes to a disk based {@link SailSourceModel}. - * - * @author James Leigh */ -abstract class MemoryOverflowModel extends AbstractModel { +abstract class MemoryOverflowModel extends AbstractModel implements AutoCloseable { private static final long serialVersionUID = 4119844228099208169L; private static final Runtime RUNTIME = Runtime.getRuntime(); - private static final int LARGE_BLOCK = 10000; + private static final int LARGE_BLOCK = 5 * 1024; + + private static volatile boolean overflow; // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB - private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024; + // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. + private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024 + : 32 * 1024 * 1024; final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); @@ -60,31 +68,21 @@ abstract class MemoryOverflowModel extends AbstractModel { private transient File dataDir; - private transient SailStore store; + private transient NativeSailStore store; private transient volatile SailSourceModel disk; - private long baseline = 0; - - private long maxBlockSize = 0; + private final boolean verifyAdditions; - SimpleValueFactory vf = SimpleValueFactory.getInstance(); + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); - public MemoryOverflowModel() { + public MemoryOverflowModel(boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(LARGE_BLOCK); } - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } - - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); - } - - public MemoryOverflowModel(Set namespaces) { + public MemoryOverflowModel(Set namespaces, boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(namespaces, LARGE_BLOCK); } @@ -138,6 +136,33 @@ public boolean add(Statement st) { return getDelegate().add(st); } + @Override + public boolean addAll(Collection c) { + checkMemoryOverflow(); + if (disk != null || c.size() <= 1024) { + return getDelegate().addAll(c); + } else { + boolean ret = false; + HashSet buffer = new HashSet<>(); + for (Statement st : c) { + buffer.add(st); + if (buffer.size() >= 1024) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + innerCheckMemoryOverflow(); + } + } + if (!buffer.isEmpty()) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + } + + return ret; + + } + + } + @Override public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { return getDelegate().remove(subj, pred, obj, contexts); @@ -192,16 +217,26 @@ public synchronized void removeTermIteration(Iterator iter, Resource } } - protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException; + protected abstract NativeSailStore createSailStore(File dataDir) throws IOException, SailException; private Model getDelegate() { - LinkedHashModel memory = this.memory; + var memory = this.memory; if (memory != null) { return memory; } else { - synchronized (this) { + var disk = this.disk; + if (disk != null) { return disk; } + synchronized (this) { + if (this.memory != null) { + return this.memory; + } + if (this.disk != null) { + return this.disk; + } + throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state"); + } } } @@ -232,45 +267,101 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx } } - private synchronized void checkMemoryOverflow() { + static class GcInfo { + long count; + long time; + } + + private final Map prevGcInfo = new ConcurrentHashMap<>(); + + private synchronized boolean highGcLoad() { + boolean highLoad = false; + + // get all garbage collector MXBeans. + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + long count = gcBean.getCollectionCount(); + long time = gcBean.getCollectionTime(); + + GcInfo prevInfo = prevGcInfo.get(gcBean.getName()); + if (prevInfo != null) { + long countDiff = count - prevInfo.count; + long timeDiff = time - prevInfo.time; + if (countDiff != 0) { + double gcLoad = (double) timeDiff / countDiff; + // TODO find good threshold + if (gcLoad > 100) { + highLoad = true; + } + } + } else { + prevInfo = new GcInfo(); + prevGcInfo.put(gcBean.getName(), prevInfo); + } + prevInfo.count = count; + prevInfo.time = time; + } + return highLoad; + } + + private void checkMemoryOverflow() { + if (disk == getDelegate()) { + return; + } + + if (overflow) { + innerCheckMemoryOverflow(); + } + int size = size() + 1; + if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { + if (highGcLoad()) { + logger.debug("syncing at {} triples.", size); + overflowToDisk(); + } else { + innerCheckMemoryOverflow(); + } + } + } + + private synchronized void innerCheckMemoryOverflow() { if (disk == null) { int size = size(); if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); + boolean overflow = highGcLoad(); + if (!overflow) { + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); - // estimated memory used - long used = totalMemory - freeMemory; + // estimated memory used + long used = totalMemory - freeMemory; - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); - overflowToDisk(); - } + // try to prevent OOM + overflow = freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING; + } + if (overflow) { + logger.debug("syncing at {} triples.", size); + overflowToDisk(); } - baseline = used; } } } private synchronized void overflowToDisk() { + overflow = true; + if (memory == null) { + assert disk != null; + return; + } + try { LinkedHashModel memory = this.memory; this.memory = null; @@ -279,12 +370,34 @@ private synchronized void overflowToDisk() { dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store); - disk.addAll(memory); + disk = new SailSourceModel(store, memory, verifyAdditions); logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; logger.error("Error while writing to overflow directory " + path, e); } } + + @Override + public void close() throws IOException { + if (disk != null) { + logger.debug("closing {}", dataDir); + try { + disk.close(); + } catch (Exception e) { + logger.error(e.toString(), e); + } finally { + try { + store.close(); + } catch (SailException e) { + logger.error(e.toString(), e); + } finally { + FileUtil.deleteDir(dataDir); + dataDir = null; + store = null; + disk = null; + } + } + } + } } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java index da790e890a..c074d07412 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java @@ -316,6 +316,10 @@ CloseableIteration createStatementIterator(Resource subj, I return tripleStore.cardinality(subjID, predID, objID, contextID); } + public void disableTxnStatus() { + this.tripleStore.disableTxnStatus(); + } + private final class NativeSailSource extends BackingSailSource { private final boolean explicit; @@ -444,6 +448,44 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai addStatement(subj, pred, obj, explicit, ctx); } + @Override + public void approveAll(Set approved, Set approvedContexts) { + sinkStoreAccessLock.lock(); + startTriplestoreTransaction(); + + try { + for (Statement statement : approved) { + Resource subj = statement.getSubject(); + IRI pred = statement.getPredicate(); + Value obj = statement.getObject(); + Resource context = statement.getContext(); + + int subjID = valueStore.storeValue(subj); + int predID = valueStore.storeValue(pred); + int objID = valueStore.storeValue(obj); + + int contextID = 0; + if (context != null) { + contextID = valueStore.storeValue(context); + } + + boolean wasNew = tripleStore.storeTriple(subjID, predID, objID, contextID, explicit); + if (wasNew && context != null) { + contextStore.increment(context); + } + + } + } catch (IOException e) { + throw new SailException(e); + } catch (RuntimeException e) { + logger.error("Encountered an unexpected problem while trying to add a statement", e); + throw e; + } finally { + sinkStoreAccessLock.unlock(); + } + + } + @Override public void deprecate(Statement statement) throws SailException { removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit, diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java index 8ebcd31991..9f81d16d90 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java @@ -72,6 +72,10 @@ public class NativeStore extends AbstractNotifyingSail implements FederatedServi final static class MemoryOverflowIntoNativeStore extends MemoryOverflowModel { private static final long serialVersionUID = 1L; + public MemoryOverflowIntoNativeStore() { + super(false); + } + /** * The class is static to avoid taking a pointer which might make it hard to get a phantom reference. */ @@ -81,6 +85,7 @@ private static final class OverFlowStoreCleaner implements Runnable { private OverFlowStoreCleaner(NativeSailStore nativeSailStore, File dataDir) { this.nativeSailStore = nativeSailStore; + nativeSailStore.disableTxnStatus(); this.dataDir = dataDir; } @@ -99,7 +104,7 @@ public void run() { } @Override - protected SailStore createSailStore(File dataDir) throws IOException, SailException { + protected NativeSailStore createSailStore(File dataDir) throws IOException, SailException { // Model can't fit into memory, use another NativeSailStore to store delta NativeSailStore nativeSailStore = new NativeSailStore(dataDir, "spoc"); // Once the model is no longer reachable (i.e. phantom reference we can close the diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java index 1e73b9cdff..969b99461d 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java @@ -11,6 +11,8 @@ package org.eclipse.rdf4j.sail.nativerdf; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -40,13 +42,13 @@ /** * A {@link Model} that keeps the {@link Statement}s in an {@link SailSource}. - * - * @author James Leigh */ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); + private final boolean verifyAdditions; + private final class StatementIterator implements Iterator { final CloseableIteration stmts; @@ -99,16 +101,21 @@ public void remove() { SailSink sink; - private long size; - private final IsolationLevels level = IsolationLevels.NONE; - public SailSourceModel(SailStore store) { - this(store.getExplicitSailSource()); + public SailSourceModel(SailStore store, Model bulk, boolean verifyAdditions) { + this(store, verifyAdditions); + sink().approveAll(bulk, bulk.contexts()); + sink.flush(); } - public SailSourceModel(SailSource source) { + public SailSourceModel(SailStore store, boolean verifyAdditions) { + this(store.getExplicitSailSource(), verifyAdditions); + } + + public SailSourceModel(SailSource source, boolean verifyAdditions) { this.source = source; + this.verifyAdditions = verifyAdditions; } @Override @@ -149,21 +156,19 @@ public String toString() { @Override public synchronized int size() { - if (size < 0) { + long size = 0; + try { + CloseableIteration iter = dataset().getStatements(null, null, null); try { - CloseableIteration iter; - iter = dataset().getStatements(null, null, null); - try { - while (iter.hasNext()) { - iter.next(); - size++; - } - } finally { - iter.close(); + while (iter.hasNext()) { + iter.next(); + size++; } - } catch (SailException e) { - throw new ModelException(e); + } finally { + iter.close(); } + } catch (SailException e) { + throw new ModelException(e); } if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -245,13 +250,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... throw new UnsupportedOperationException("Incomplete statement"); } try { - if (contains(subj, pred, obj, contexts)) { + if (verifyAdditions && contains(subj, pred, obj, contexts)) { logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts); return false; } - if (size >= 0) { - size++; - } if (contexts == null || contexts.length == 0) { sink().approve(subj, pred, obj, null); } else { @@ -265,12 +267,51 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... } } + @Override + public boolean addAll(Collection statements) { + if (statements.isEmpty()) { + return false; + } + + if (statements.size() == 1) { + Statement st = statements.iterator().next(); + return add(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext()); + } + + boolean added = false; + + HashSet tempSet = new HashSet<>(); + HashSet contexts = new HashSet<>(); + SailSink sink = sink(); + + for (Statement statement : statements) { + if (tempSet.size() >= 1024) { + sink.approveAll(tempSet, contexts); + tempSet.clear(); + contexts.clear(); + added = true; + } + if (!contains(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext())) { + contexts.add(statement.getContext()); + tempSet.add(statement); + } + } + + if (!tempSet.isEmpty()) { + sink.approveAll(tempSet, contexts); + added = true; + } + + return added; + + } + @Override public synchronized boolean clear(Resource... contexts) { try { if (contains(null, null, null, contexts)) { sink().clear(contexts); - size = -1; return true; } } catch (SailException e) { @@ -281,25 +322,23 @@ public synchronized boolean clear(Resource... contexts) { @Override public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + boolean removed = false; try { - if (contains(subj, pred, obj, contexts)) { - size = -1; - CloseableIteration stmts; - stmts = dataset().getStatements(subj, pred, obj, contexts); - try { - while (stmts.hasNext()) { - Statement st = stmts.next(); - sink().deprecate(st); - } - } finally { - stmts.close(); + CloseableIteration stmts = dataset().getStatements(subj, pred, obj, + contexts); + try { + while (stmts.hasNext()) { + Statement st = stmts.next(); + sink().deprecate(st); + removed = true; } - return true; + } finally { + stmts.close(); } } catch (SailException e) { throw new ModelException(e); } - return false; + return removed; } @Override @@ -374,7 +413,6 @@ public synchronized void removeTermIteration(Iterator iter, Resource } finally { stmts.close(); } - size = -1; } catch (SailException e) { throw new ModelException(e); } @@ -406,6 +444,21 @@ private synchronized SailDataset dataset() throws SailException { return dataset; } + public void close() { + if (sink != null) { + try { + sink.flush(); + } finally { + sink.close(); + sink = null; + } + } + if (dataset != null) { + dataset.close(); + dataset = null; + } + } + private boolean contains(SailDataset dataset, Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { if (dataset == null) { @@ -466,4 +519,4 @@ Resource[] cast(Value[] contexts) { return result; } -} +} \ No newline at end of file diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java index 96ec387ff2..c4bc52cd31 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java @@ -506,6 +506,10 @@ public RecordIterator getTriples(int subj, int pred, int obj, int context, boole return btreeIter; } + public void disableTxnStatus() { + txnStatusFile.disable(); + } + /*-------------------------------------* * Inner class ExplicitStatementFilter * *-------------------------------------*/ diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java index 469cb4892e..fe8013124a 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java @@ -22,6 +22,12 @@ */ class TxnStatusFile { + boolean disabled = false; + + public void disable() { + this.disabled = true; + } + public enum TxnStatus { /** @@ -100,6 +106,9 @@ public void close() throws IOException { * @throws IOException If the transaction status could not be written to file. */ public void setTxnStatus(TxnStatus txnStatus) throws IOException { + if (disabled) { + return; + } if (txnStatus == TxnStatus.NONE) { nioFile.truncate(0); } else { @@ -115,6 +124,10 @@ public void setTxnStatus(TxnStatus txnStatus) throws IOException { * @throws IOException If the transaction status file could not be read. */ public TxnStatus getTxnStatus() throws IOException { + if (disabled) { + return TxnStatus.NONE; + } + byte[] bytes = nioFile.readBytes(0, 1); TxnStatus status; @@ -147,6 +160,10 @@ public TxnStatus getTxnStatus() throws IOException { } private TxnStatus getTxnStatusDeprecated() throws IOException { + if (disabled) { + return TxnStatus.NONE; + } + byte[] bytes = nioFile.readBytes(0, (int) nioFile.size()); String s = new String(bytes, US_ASCII); diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java index e758bbb888..bb0f6693a5 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java @@ -19,6 +19,8 @@ class ConcurrentNodeCache extends ConcurrentCache { + private final static int CONCURRENCY = Runtime.getRuntime().availableProcessors(); + private final Function reader; private static final Consumer writeNode = node -> { @@ -38,7 +40,7 @@ public ConcurrentNodeCache(Function reader) { } public void flush() { - cache.forEachValue(Long.MAX_VALUE, writeNode); + cache.forEachValue(CONCURRENCY, writeNode); } public void put(Node node) { diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java index b91c09dc24..7cf1f420b3 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java @@ -51,7 +51,7 @@ protected SailSourceModel getNewModel() { try { NativeSailStore store = new NativeSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(), "spoc"); - return new SailSourceModel(store); + return new SailSourceModel(store, false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java new file mode 100644 index 0000000000..826f138674 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java @@ -0,0 +1,240 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ + +package org.eclipse.rdf4j.sail.nativerdf.benchmark; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.sail.NotifyingSail; +import org.eclipse.rdf4j.sail.NotifyingSailConnection; +import org.eclipse.rdf4j.sail.SailConnectionListener; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailWrapper; +import org.eclipse.rdf4j.sail.nativerdf.NativeStore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Logger; + +/** + * @author Håvard Ottestad + */ +@State(Scope.Benchmark) +@Warmup(iterations = 0) +@BenchmarkMode({ Mode.AverageTime }) +@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" }) +@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class OverflowBenchmarkConcurrent { + + @Setup(Level.Trial) + public void setup() { + ((Logger) (LoggerFactory + .getLogger("org.eclipse.rdf4j.sail.nativerdf.MemoryOverflowModel"))) + .setLevel(ch.qos.logback.classic.Level.DEBUG); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include("OverflowBenchmarkConcurrent") // adapt to run other benchmark tests + .build(); + + new Runner(opt).run(); + } + + @Benchmark + public void manyConcurrentTransactions() throws IOException { + File temporaryFolder = Files.newTemporaryFolder(); + SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper( + new NotifySailWrapper( + new NotifySailWrapper(new NotifySailWrapper(new NativeStore(temporaryFolder))))))); + ExecutorService executorService = Executors.newFixedThreadPool(10); + + try { + + Model parse; + try (InputStream resourceAsStream = getClass().getClassLoader() + .getResourceAsStream("benchmarkFiles/datagovbe-valid.ttl")) { + parse = Rio.parse(resourceAsStream, RDFFormat.TURTLE); + } + + List> futureList = new ArrayList<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + for (int i = 0; i < 38; i++) { + var seed = i + 485924; + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + + int addSize = new Random(seed).nextInt(parse.size()); + IRI context = Values.iri("http://example.org/" + new Random(seed + 1).nextInt(10)); + List collect = parse.stream() + .skip(addSize) + .limit(10_000) + .map(s -> SimpleValueFactory.getInstance() + .createStatement(s.getSubject(), s.getPredicate(), s.getObject(), context)) + .collect(Collectors.toList()); + StringWriter stringWriter = new StringWriter(); + Rio.write(collect, stringWriter, RDFFormat.TRIG); + String string = stringWriter.toString(); + + connection.prepareUpdate("INSERT DATA { GRAPH " + string + " }").execute(); + + System.out.println("Added"); + } + }); + futureList.add(submit); + } + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + System.out.println("Waiting"); + long l = System.currentTimeMillis(); + while (!connection.isEmpty()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + if (System.currentTimeMillis() - l > 1000) { + break; + } + } + System.out.println("Removing"); + connection.begin(); + try (RepositoryResult statements = connection.getStatements(null, null, null)) { + statements.stream().limit(10_000).forEach(connection::remove); + } + connection.commit(); + + System.out.println("Removed"); + } + }); + futureList.add(submit); + } + } + + countDownLatch.countDown(); + + for (int i = 0; i < futureList.size(); i++) { + Future future = futureList.get(i); + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("Done: " + i); + } + + } finally { + try { + executorService.shutdownNow(); + } finally { + try { + sailRepository.shutDown(); + } finally { + FileUtils.deleteDirectory(temporaryFolder); + } + } + } + + } + + static class NotifySailWrapper extends NotifyingSailWrapper { + + public NotifySailWrapper(NotifyingSail baseSail) { + super(baseSail); + } + + @Override + public NotifyingSailConnection getConnection() throws SailException { + return new Connection(super.getConnection()); + } + } + + static class Connection extends NotifyingSailConnectionWrapper implements SailConnectionListener { + + Set addedStatements = new HashSet<>(); + Set removedStatements = new HashSet<>(); + + public Connection(NotifyingSailConnection wrappedCon) { + super(wrappedCon); + addConnectionListener(this); + } + + @Override + public void statementAdded(Statement st) { + removedStatements.remove(st); + addedStatements.add(st); + } + + @Override + public void statementRemoved(Statement st) { + addedStatements.remove(st); + removedStatements.add(st); + } + + } + +}