Skip to content

Commit fe0f776

Browse files
Merge branch 'feature/sanitizer' into release/2.1.0
2 parents 040bf81 + 94758a7 commit fe0f776

File tree

7 files changed

+254
-177
lines changed

7 files changed

+254
-177
lines changed

src/main/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheck.java

Lines changed: 0 additions & 117 deletions
This file was deleted.

src/main/java/org/cryptomator/cryptofs/health/api/HealthCheck.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,25 @@
33
import org.cryptomator.cryptofs.VaultConfig;
44
import org.cryptomator.cryptolib.api.Cryptor;
55
import org.cryptomator.cryptolib.api.Masterkey;
6+
import org.slf4j.LoggerFactory;
67

78
import java.nio.file.Path;
89
import java.util.Collection;
910
import java.util.ServiceLoader;
1011
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.Executors;
12-
import java.util.concurrent.ForkJoinPool;
12+
import java.util.function.Consumer;
1313
import java.util.stream.Stream;
14+
import java.util.stream.StreamSupport;
1415

1516
public interface HealthCheck {
1617

18+
/**
19+
* @return All known health checks
20+
*/
21+
static Collection<HealthCheck> allChecks() {
22+
return ServiceLoader.load(HealthCheck.class).stream().map(ServiceLoader.Provider::get).toList();
23+
}
24+
1725
/**
1826
* @return A unique name for this check (that might be used as a translation key)
1927
*/
@@ -24,24 +32,42 @@ default String identifier() {
2432
/**
2533
* Checks the vault at the given path.
2634
*
27-
* @param pathToVault Path to the vault's root directory
28-
* @param config The parsed and verified vault config
29-
* @param masterkey The masterkey
30-
* @param cryptor A cryptor initialized for this vault
31-
* @param executor An executor service to run the health check
32-
* @return Diagnostic results
35+
* @param pathToVault Path to the vault's root directory
36+
* @param config The parsed and verified vault config
37+
* @param masterkey The masterkey
38+
* @param cryptor A cryptor initialized for this vault
39+
* @param resultCollector Callback called for each result.
3340
*/
34-
Stream<DiagnosticResult> check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, ExecutorService executor);
41+
void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer<DiagnosticResult> resultCollector);
3542

3643
/**
37-
* Attempts to cancel this health check (if it is running).
44+
* Invokes the health check on a background thread scheduled using the given executor service. The results will be
45+
* streamed. If the stream gets {@link Stream#close() closed} before it terminates, an attempt is made to cancel
46+
* the health check.
47+
* <p>
48+
* The check blocks if the stream is not consumed
3849
*
39-
* Calling this method does not guarantee that no further results are produced due to async behaviour.
50+
* @param pathToVault Path to the vault's root directory
51+
* @param config The parsed and verified vault config
52+
* @param masterkey The masterkey
53+
* @param cryptor A cryptor initialized for this vault
54+
* @param executor An executor service to run the health check
55+
* @return A lazily filled stream of diagnostic results.
4056
*/
41-
void cancel();
57+
default Stream<DiagnosticResult> check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, ExecutorService executor) {
58+
var resultSpliterator = new TransferSpliterator<DiagnosticResult>(new PoisonResult());
4259

43-
static Collection<HealthCheck> allChecks() {
44-
return ServiceLoader.load(HealthCheck.class).stream().map(ServiceLoader.Provider::get).toList();
60+
var task = executor.submit(() -> {
61+
try {
62+
check(pathToVault, config, masterkey, cryptor, resultSpliterator);
63+
} catch (TransferSpliterator.TransferClosedException e) {
64+
LoggerFactory.getLogger(HealthCheck.class).debug("{} cancelled.", identifier());
65+
} finally {
66+
resultSpliterator.close();
67+
}
68+
});
69+
70+
return StreamSupport.stream(resultSpliterator, false).onClose(() -> task.cancel(true));
4571
}
4672

47-
}
73+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.cryptomator.cryptofs.health.api;
2+
3+
record PoisonResult() implements DiagnosticResult {
4+
@Override
5+
public Severity getServerity() {
6+
return null;
7+
}
8+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.cryptomator.cryptofs.health.api;
2+
3+
import com.google.common.base.Preconditions;
4+
5+
import java.util.Objects;
6+
import java.util.Spliterators;
7+
import java.util.concurrent.LinkedTransferQueue;
8+
import java.util.concurrent.TransferQueue;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
import java.util.function.Consumer;
11+
12+
/**
13+
* A concurrent spliterator that only {@link java.util.Spliterator#tryAdvance(Consumer) advances} by transferring
14+
* elements it {@link Consumer#accept(Object) consumes}. Consumption blocks if the spliterator is not advanced.
15+
* <p>
16+
* Once no futher elements are expected, this spliterator <b>must</b> be {@link #close() closed}, otherwise it'll
17+
* wait indefinitely.
18+
*
19+
* @param <T> the type of elements consumed and returned
20+
*/
21+
class TransferSpliterator<T> extends Spliterators.AbstractSpliterator<T> implements Consumer<T>, AutoCloseable {
22+
23+
private final TransferQueue<T> queue = new LinkedTransferQueue<>();
24+
private final AtomicBoolean poisoned = new AtomicBoolean();
25+
private final T poison;
26+
27+
/**
28+
* @param poison A unique value that must be distinct to every single value expected to be transferred.
29+
*/
30+
public TransferSpliterator(T poison) {
31+
super(Long.MAX_VALUE, DISTINCT | NONNULL | IMMUTABLE);
32+
this.poison = Objects.requireNonNull(poison);
33+
}
34+
35+
@Override
36+
public boolean tryAdvance(Consumer<? super T> action) {
37+
try {
38+
var element = queue.take();
39+
if (element == poison) {
40+
return false;
41+
} else {
42+
action.accept(element);
43+
return true;
44+
}
45+
} catch (InterruptedException e) {
46+
Thread.currentThread().interrupt();
47+
return false;
48+
}
49+
}
50+
51+
/**
52+
* Transfers the value to consuming thread. Blocks until transfer is complete or thread is interrupted.
53+
* @param value The value to transfer
54+
* @throws TransferClosedException If the transfer has been closed or this thread is interrupted while waiting for the consuming side.
55+
*/
56+
@Override
57+
public void accept(T value) throws TransferClosedException {
58+
Preconditions.checkArgument(value != poison, "must not feed poison");
59+
if (poisoned.get()) {
60+
throw new TransferClosedException();
61+
}
62+
try {
63+
queue.transfer(value);
64+
} catch (InterruptedException e) {
65+
Thread.currentThread().interrupt();
66+
throw new TransferClosedException();
67+
}
68+
}
69+
70+
@Override
71+
public void close() {
72+
poisoned.set(true);
73+
queue.offer(poison);
74+
}
75+
76+
/**
77+
* Thrown if an attempt is made to {@link #accept(Object) transfer} further elements after the TransferSpliterator
78+
* has been closed.
79+
*/
80+
public static class TransferClosedException extends IllegalStateException {}
81+
82+
}

src/main/java/org/cryptomator/cryptofs/health/dirid/DirIdCheck.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import org.cryptomator.cryptofs.VaultConfig;
44
import org.cryptomator.cryptofs.common.Constants;
5-
import org.cryptomator.cryptofs.health.api.AbstractHealthCheck;
65
import org.cryptomator.cryptofs.health.api.CheckFailed;
76
import org.cryptomator.cryptofs.health.api.DiagnosticResult;
87
import org.cryptomator.cryptofs.health.api.HealthCheck;
@@ -18,30 +17,22 @@
1817
import java.nio.file.Path;
1918
import java.nio.file.SimpleFileVisitor;
2019
import java.nio.file.attribute.BasicFileAttributes;
21-
import java.util.ArrayList;
22-
import java.util.Collection;
2320
import java.util.HashMap;
2421
import java.util.HashSet;
25-
import java.util.List;
2622
import java.util.Map;
27-
import java.util.Queue;
2823
import java.util.Set;
29-
import java.util.Spliterator;
30-
import java.util.Spliterators;
3124
import java.util.function.Consumer;
32-
import java.util.stream.Stream;
33-
import java.util.stream.StreamSupport;
3425

3526
/**
3627
* Reads all dir.c9r files and checks if the corresponding dir exists.
3728
*/
38-
public class DirIdCheck extends AbstractHealthCheck {
29+
public class DirIdCheck implements HealthCheck {
3930

4031
private static final Logger LOG = LoggerFactory.getLogger(DirIdCheck.class);
4132
private static final int MAX_TRAVERSAL_DEPTH = 4; // d/2/30/Fo0==.c9r/dir.c9r
4233

4334
@Override
44-
protected void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer<DiagnosticResult> resultCollector) {
35+
public void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer<DiagnosticResult> resultCollector) {
4536
// scan vault structure:
4637
var dataDirPath = pathToVault.resolve(Constants.DATA_DIR_NAME);
4738
var dirVisitor = new DirVisitor(dataDirPath, resultCollector);

0 commit comments

Comments
 (0)