Skip to content

Commit eda84e4

Browse files
committed
GH-4554 Correctly close changesets
- some shallow copies of Changeset were not closed at all - use reference counting for approved and deprecated models
1 parent e9dac06 commit eda84e4

File tree

2 files changed

+75
-43
lines changed

2 files changed

+75
-43
lines changed

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java

Lines changed: 68 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package org.eclipse.rdf4j.sail.base;
1212

1313
import java.lang.invoke.VarHandle;
14+
import java.lang.ref.Reference;
1415
import java.util.ArrayList;
1516
import java.util.Arrays;
1617
import java.util.Collection;
@@ -23,6 +24,7 @@
2324
import java.util.Objects;
2425
import java.util.Set;
2526
import java.util.concurrent.Semaphore;
27+
import java.util.concurrent.atomic.AtomicInteger;
2628
import java.util.concurrent.atomic.LongAdder;
2729
import java.util.concurrent.locks.StampedLock;
2830
import java.util.stream.Collectors;
@@ -51,6 +53,28 @@
5153
@InternalUseOnly
5254
public abstract class Changeset implements SailSink, ModelFactory {
5355

56+
static class CountedReference<T> {
57+
final T referent;
58+
final AtomicInteger count = new AtomicInteger(1);
59+
60+
CountedReference(T referent) {
61+
this.referent = referent;
62+
}
63+
64+
CountedReference<T> retain() {
65+
count.incrementAndGet();
66+
return this;
67+
}
68+
69+
boolean release() {
70+
return count.decrementAndGet() == 0;
71+
}
72+
73+
T get() {
74+
return referent;
75+
}
76+
}
77+
5478
AdderBasedReadWriteLock readWriteLock = new AdderBasedReadWriteLock();
5579
AdderBasedReadWriteLock refBacksReadWriteLock = new AdderBasedReadWriteLock();
5680
Semaphore prependLock = new Semaphore(1);
@@ -78,15 +102,15 @@ public abstract class Changeset implements SailSink, ModelFactory {
78102
* <p>
79103
* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE
80104
*/
81-
private volatile Model approved;
105+
private volatile CountedReference<Model> approved;
82106
private volatile boolean approvedEmpty = true;
83107

84108
/**
85109
* Explicit statements that have been removed as part of a transaction, but have not yet been committed.
86110
* <p>
87111
* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE
88112
*/
89-
private volatile Model deprecated;
113+
private volatile CountedReference<Model> deprecated;
90114
private volatile boolean deprecatedEmpty = true;
91115

92116
/**
@@ -132,16 +156,16 @@ public void close() throws SailException {
132156
addedNamespaces = null;
133157
removedPrefixes = null;
134158
try {
135-
if (approved instanceof AutoCloseable) {
136-
((AutoCloseable) approved).close();
159+
if (approved != null && approved.release() && approved.get() instanceof AutoCloseable) {
160+
((AutoCloseable) approved.get()).close();
137161
}
138162
} catch (Exception e) {
139163
throw new SailException(e);
140164
} finally {
141165
approved = null;
142-
if (deprecated instanceof AutoCloseable) {
166+
if (deprecated != null && deprecated.release() && deprecated.get() instanceof AutoCloseable) {
143167
try {
144-
((AutoCloseable) deprecated).close();
168+
((AutoCloseable) deprecated.get()).close();
145169
} catch (Exception e) {
146170
throw new SailException(e);
147171
} finally {
@@ -184,7 +208,7 @@ boolean hasApproved(Resource subj, IRI pred, Value obj, Resource[] contexts) {
184208

185209
boolean readLock = readWriteLock.readLock();
186210
try {
187-
return approved.contains(subj, pred, obj, contexts);
211+
return approved.get().contains(subj, pred, obj, contexts);
188212
} finally {
189213
readWriteLock.unlockReader(readLock);
190214
}
@@ -206,7 +230,7 @@ boolean hasDeprecated(Resource subj, IRI pred, Value obj, Resource[] contexts) {
206230
}
207231
}
208232

209-
return deprecated.contains(subj, pred, obj, contexts);
233+
return deprecated.get().contains(subj, pred, obj, contexts);
210234
} finally {
211235
readWriteLock.unlockReader(readLock);
212236
}
@@ -389,7 +413,7 @@ public void clear(Resource... contexts) {
389413
statementCleared = true;
390414

391415
if (approved != null) {
392-
approved.clear();
416+
approved.get().clear();
393417
}
394418
if (approvedContexts != null) {
395419
approvedContexts.clear();
@@ -399,7 +423,7 @@ public void clear(Resource... contexts) {
399423
deprecatedContexts = new HashSet<>();
400424
}
401425
if (approved != null) {
402-
approved.remove(null, null, null, contexts);
426+
approved.get().remove(null, null, null, contexts);
403427
}
404428
if (approvedContexts != null && contexts != null) {
405429
for (Resource resource : contexts) {
@@ -410,7 +434,7 @@ public void clear(Resource... contexts) {
410434
deprecatedContexts.addAll(Arrays.asList(contexts));
411435
}
412436
}
413-
approvedEmpty = approved == null || approved.isEmpty();
437+
approvedEmpty = approved == null || approved.get().isEmpty();
414438
} finally {
415439
readWriteLock.unlockWriter(writeLock);
416440
}
@@ -425,13 +449,13 @@ public void approve(Statement statement) {
425449
try {
426450

427451
if (deprecated != null) {
428-
deprecated.remove(statement);
429-
deprecatedEmpty = deprecated == null || deprecated.isEmpty();
452+
deprecated.get().remove(statement);
453+
deprecatedEmpty = deprecated == null || deprecated.get().isEmpty();
430454
}
431455
if (approved == null) {
432-
approved = createEmptyModel();
456+
approved = new CountedReference<>(createEmptyModel());
433457
}
434-
approved.add(statement);
458+
approved.get().add(statement);
435459
approvedEmpty = false;
436460
if (statement.getContext() != null) {
437461
if (approvedContexts == null) {
@@ -456,17 +480,17 @@ public void deprecate(Statement statement) {
456480
long writeLock = readWriteLock.writeLock();
457481
try {
458482
if (approved != null) {
459-
approved.remove(statement);
460-
approvedEmpty = approved == null || approved.isEmpty();
483+
approved.get().remove(statement);
484+
approvedEmpty = approved == null || approved.get().isEmpty();
461485
}
462486
if (deprecated == null) {
463-
deprecated = createEmptyModel();
487+
deprecated = new CountedReference<>(createEmptyModel());
464488
}
465-
deprecated.add(statement);
489+
deprecated.get().add(statement);
466490
deprecatedEmpty = false;
467491
Resource ctx = statement.getContext();
468492
if (approvedContexts != null && approvedContexts.contains(ctx)
469-
&& !approved.contains(null, null, null, ctx)) {
493+
&& !approved.get().contains(null, null, null, ctx)) {
470494
approvedContexts.remove(ctx);
471495
}
472496
} finally {
@@ -501,11 +525,11 @@ public String toString() {
501525
sb.append(" deprecatedContexts, ");
502526
}
503527
if (deprecated != null) {
504-
sb.append(deprecated.size());
528+
sb.append(deprecated.get().size());
505529
sb.append(" deprecated, ");
506530
}
507531
if (approved != null) {
508-
sb.append(approved.size());
532+
sb.append(approved.get().size());
509533
sb.append(" approved, ");
510534
}
511535
if (sb.length() > 0) {
@@ -520,9 +544,9 @@ protected void setChangeset(Changeset from) {
520544
assert !from.closed;
521545

522546
this.observed = from.observed;
523-
this.approved = from.approved;
547+
this.approved = from.approved != null ? from.approved.retain() : null;
524548
this.approvedEmpty = from.approvedEmpty;
525-
this.deprecated = from.deprecated;
549+
this.deprecated = from.deprecated != null ? from.deprecated.retain() : null;
526550
this.deprecatedEmpty = from.deprecatedEmpty;
527551
this.approvedContexts = from.approvedContexts;
528552
this.deprecatedContexts = from.deprecatedContexts;
@@ -689,7 +713,7 @@ List<Statement> getDeprecatedStatements() {
689713

690714
boolean readLock = readWriteLock.readLock();
691715
try {
692-
return new ArrayList<>(deprecated);
716+
return new ArrayList<>(deprecated.get());
693717
} finally {
694718
readWriteLock.unlockReader(readLock);
695719
}
@@ -704,7 +728,7 @@ List<Statement> getApprovedStatements() {
704728

705729
boolean readLock = readWriteLock.readLock();
706730
try {
707-
return new ArrayList<>(approved);
731+
return new ArrayList<>(approved.get());
708732
} finally {
709733
readWriteLock.unlockReader(readLock);
710734
}
@@ -725,7 +749,7 @@ boolean hasDeprecated(Statement statement) {
725749
}
726750
}
727751
if (deprecated != null) {
728-
return deprecated.contains(statement);
752+
return deprecated.get().contains(statement);
729753
} else {
730754
return false;
731755
}
@@ -751,7 +775,7 @@ Iterable<Statement> getApprovedStatements(Resource subj, IRI pred, Value obj,
751775
boolean readLock = readWriteLock.readLock();
752776
try {
753777

754-
Iterable<Statement> statements = approved.getStatements(subj, pred, obj, contexts);
778+
Iterable<Statement> statements = approved.get().getStatements(subj, pred, obj, contexts);
755779

756780
// This is a synchronized context, users of this method will be allowed to use the results at their leisure.
757781
// We
@@ -788,7 +812,8 @@ Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
788812
try {
789813
// TODO none of this is particularly well thought-out in terms of performance, but we are aiming
790814
// for functionally complete first.
791-
Stream<Triple> approvedSubjectTriples = approved.parallelStream()
815+
Stream<Triple> approvedSubjectTriples = approved.get()
816+
.parallelStream()
792817
.filter(st -> st.getSubject().isTriple())
793818
.map(st -> (Triple) st.getSubject())
794819
.filter(t -> {
@@ -801,7 +826,8 @@ Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
801826
return obj == null || obj.equals(t.getObject());
802827
});
803828

804-
Stream<Triple> approvedObjectTriples = approved.parallelStream()
829+
Stream<Triple> approvedObjectTriples = approved.get()
830+
.parallelStream()
805831
.filter(st -> st.getObject().isTriple())
806832
.map(st -> (Triple) st.getObject())
807833
.filter(t -> {
@@ -825,8 +851,8 @@ void removeApproved(Statement next) {
825851
long writeLock = readWriteLock.writeLock();
826852
try {
827853
if (approved != null) {
828-
approved.remove(next);
829-
approvedEmpty = approved == null || approved.isEmpty();
854+
approved.get().remove(next);
855+
approvedEmpty = approved == null || approved.get().isEmpty();
830856
}
831857
} finally {
832858
readWriteLock.unlockWriter(writeLock);
@@ -850,7 +876,7 @@ void sinkApproved(SailSink sink) {
850876
boolean readLock = readWriteLock.readLock();
851877
try {
852878
if (approved != null) {
853-
sink.approveAll(approved, approvedContexts);
879+
sink.approveAll(approved.get(), approvedContexts);
854880
}
855881
} finally {
856882
readWriteLock.unlockReader(readLock);
@@ -865,7 +891,7 @@ void sinkDeprecated(SailSink sink) {
865891
boolean readLock = readWriteLock.readLock();
866892
try {
867893
if (deprecated != null) {
868-
sink.deprecateAll(deprecated);
894+
sink.deprecateAll(deprecated.get());
869895
}
870896
} finally {
871897
readWriteLock.unlockReader(readLock);
@@ -895,12 +921,12 @@ public void approveAll(Set<Statement> approve, Set<Resource> approveContexts) {
895921
try {
896922

897923
if (deprecated != null) {
898-
deprecated.removeAll(approve);
924+
deprecated.get().removeAll(approve);
899925
}
900926
if (approved == null) {
901-
approved = createEmptyModel();
927+
approved = new CountedReference<>(createEmptyModel());
902928
}
903-
approved.addAll(approve);
929+
approved.get().addAll(approve);
904930
approvedEmpty = approvedEmpty && approve.isEmpty();
905931

906932
if (approveContexts != null) {
@@ -921,19 +947,19 @@ public void deprecateAll(Set<Statement> deprecate) {
921947
try {
922948

923949
if (approved != null) {
924-
approved.removeAll(deprecate);
925-
approvedEmpty = approved == null || approved.isEmpty();
950+
approved.get().removeAll(deprecate);
951+
approvedEmpty = approved == null || approved.get().isEmpty();
926952
}
927953
if (deprecated == null) {
928-
deprecated = createEmptyModel();
954+
deprecated = new CountedReference<>(createEmptyModel());
929955
}
930-
deprecated.addAll(deprecate);
956+
deprecated.get().addAll(deprecate);
931957
deprecatedEmpty = deprecatedEmpty && deprecate.isEmpty();
932958

933959
for (Statement statement : deprecate) {
934960
Resource ctx = statement.getContext();
935961
if (approvedContexts != null && approvedContexts.contains(ctx)
936-
&& !approved.contains(null, null, null, ctx)) {
962+
&& !approved.get().contains(null, null, null, ctx)) {
937963
approvedContexts.remove(ctx);
938964
}
939965
}

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,10 @@ private void flush(SailSink sink) throws SailException {
477477
&& !isChanged((Changeset) sink)) {
478478
// one change to apply that is not in use to an empty Changeset
479479
Changeset dst = (Changeset) sink;
480-
dst.setChangeset(changes.pop());
480+
Changeset src = changes.pop();
481+
dst.setChangeset(src);
482+
// correctly close changeset
483+
src.close();
481484
} else {
482485
Iterator<Changeset> iter = changes.iterator();
483486
while (iter.hasNext()) {
@@ -517,6 +520,9 @@ private void flush(Changeset change, SailSink sink) throws SailException {
517520

518521
change.sinkDeprecated(sink);
519522
change.sinkApproved(sink);
523+
524+
// correctly close changeset
525+
change.close();
520526
}
521527

522528
}

0 commit comments

Comments
 (0)