Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.pjacoco.agent.it;

import static org.junit.jupiter.api.Assertions.*;
import io.pjacoco.agent.AgentOptions;
import io.pjacoco.agent.control.ControlEndpoint;
import io.pjacoco.agent.mapping.TestIdMappingRegistry;
import io.pjacoco.agent.observability.AgentLog;
Expand Down Expand Up @@ -46,7 +47,8 @@ void registeredShown(@TempDir Path dir) throws Exception { // R
TestIdMappingRegistry mapping = new TestIdMappingRegistry(100);
TestStoreRegistry reg = new TestStoreRegistry(dir, new ExecWriter(), new Metrics(),
new AgentLog(), false, 1000, System::currentTimeMillis);
ControlEndpoint ep = new ControlEndpoint(reg, mapping, "127.0.0.1", 0);
ControlEndpoint ep = new ControlEndpoint(reg, mapping, new ExecWriter(), AgentOptions.empty(),
"127.0.0.1", 0);
int port = ep.start();
try {
// 1) register a mapping THROUGH the HTTP control plane (%23 -> '#')
Expand All @@ -69,7 +71,8 @@ void boundedEvictionThroughEndpoint(@TempDir Path dir) throws Exception { // R
TestIdMappingRegistry mapping = new TestIdMappingRegistry(2); // cap = 2
TestStoreRegistry reg = new TestStoreRegistry(dir, new ExecWriter(), new Metrics(),
new AgentLog(), false, 1000, System::currentTimeMillis);
ControlEndpoint ep = new ControlEndpoint(reg, mapping, "127.0.0.1", 0);
ControlEndpoint ep = new ControlEndpoint(reg, mapping, new ExecWriter(), AgentOptions.empty(),
"127.0.0.1", 0);
int port = ep.start();
try {
assertEquals(200, post(port, "/__coverage__/trace/map?traceId=t1&testId=com.x.T%23a"));
Expand Down
4 changes: 4 additions & 0 deletions agent/src/main/java/io/pjacoco/agent/AgentOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public double incompleteAttributionThreshold() {
* Set {@code control=false} for pure aggregate/in-process use to avoid the port bind cost and
* conflicts entirely. (REQ-U01) */
public boolean control() { return Boolean.parseBoolean(get("control", "true")); }

/** Whether {@code POST /__coverage__/test/stop} also writes {@code <testId>.exec} to disk.
* Default true for backward compatibility. Query param {@code persist=} overrides per request. */
public boolean persistOnStop() { return Boolean.parseBoolean(get("persistOnStop", "true")); }
}
6 changes: 4 additions & 2 deletions agent/src/main/java/io/pjacoco/agent/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ private static void install(String args, Instrumentation inst, AgentLog log) thr
final java.util.function.LongSupplier clockSupplier = new java.util.function.LongSupplier() {
public long getAsLong() { return System.currentTimeMillis(); }
};
final ExecWriter writer = new ExecWriter(options.incompleteAttributionThreshold());
final TestStoreRegistry registry = new TestStoreRegistry(
outDir, new ExecWriter(options.incompleteAttributionThreshold()), metrics, log,
outDir, writer, metrics, log,
options.autoRegister(), options.maxStores(), clockSupplier,
options.traceKeyAutoCreate(), options.inFlightGuardMillis());

Expand Down Expand Up @@ -96,7 +97,8 @@ outDir, new ExecWriter(options.incompleteAttributionThreshold()), metrics, log,
final ControlEndpoint[] endpointRef = new ControlEndpoint[1];
if (options.control()) {
try {
ControlEndpoint endpoint = new ControlEndpoint(registry, mapping, options.controlHost(), options.controlPort());
ControlEndpoint endpoint = new ControlEndpoint(registry, mapping, writer, options,
options.controlHost(), options.controlPort());
int port = endpoint.start();
endpointRef[0] = endpoint;
System.setProperty("pjacoco.control-port", String.valueOf(port));
Expand Down
79 changes: 76 additions & 3 deletions agent/src/main/java/io/pjacoco/agent/control/ControlEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.pjacoco.agent.AgentOptions;
import io.pjacoco.agent.mapping.TestIdMappingRegistry;
import io.pjacoco.agent.output.ExecWriter;
import io.pjacoco.agent.store.TestStore;
import io.pjacoco.agent.store.TestStoreRegistry;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -16,13 +19,18 @@
public final class ControlEndpoint {
private final TestStoreRegistry registry;
private final TestIdMappingRegistry mapping;
private final ExecWriter writer;
private final AgentOptions options;
private final String host;
private final int port;
private HttpServer server;

public ControlEndpoint(TestStoreRegistry registry, TestIdMappingRegistry mapping, String host, int port) {
public ControlEndpoint(TestStoreRegistry registry, TestIdMappingRegistry mapping,
ExecWriter writer, AgentOptions options, String host, int port) {
this.registry = registry;
this.mapping = mapping;
this.writer = writer;
this.options = options;
this.host = host;
this.port = port;
}
Expand Down Expand Up @@ -58,10 +66,67 @@ private void handleStop(HttpExchange ex) throws IOException {
Map<String, String> q = query(ex);
String testId = q.get("testId");
if (testId == null) { respond(ex, 400, "missing testId"); return; }
registry.stop(testId, q.get("result"));

String format = q.getOrDefault("format", "text");
boolean persist = parseBoolean(q.get("persist"), options.persistOnStop());

TestStoreRegistry.StopResult closed = registry.closeForStop(testId, q.get("result"));
if (closed == null) {
if ("binary".equalsIgnoreCase(format)) {
respond(ex, 404, "unknown testId");
} else {
respond(ex, 200, "stopped " + testId);
}
return;
}

if ("binary".equalsIgnoreCase(format)) {
handleBinaryStop(ex, closed, persist);
return;
}

registry.persistClosed(closed, true, true);
registry.markStopCompleted();
respond(ex, 200, "stopped " + testId);
}

private void handleBinaryStop(HttpExchange ex, TestStoreRegistry.StopResult closed,
boolean persist) throws IOException {
TestStore store = closed.snapshot();
setBinaryHeaders(ex, closed, persist);
if (closed.wasEmpty()) {
ex.sendResponseHeaders(204, -1);
ex.close();
registry.markStopCompleted();
return;
}
try {
byte[] execBytes = writer.toExecBytes(store, System.currentTimeMillis());
if (persist) {
registry.persistClosed(closed, true, false);
}
ex.getResponseHeaders().set("Content-Type", "application/octet-stream");
ex.sendResponseHeaders(200, execBytes.length);
OutputStream os = ex.getResponseBody();
os.write(execBytes);
os.close();
registry.markStopCompleted();
} catch (Exception e) {
respond(ex, 500, "serialization error: " + e.getMessage());
registry.markStopCompleted();
}
}

private void setBinaryHeaders(HttpExchange ex, TestStoreRegistry.StopResult closed, boolean persist) {
TestStore store = closed.snapshot();
ex.getResponseHeaders().set("X-Pjacoco-TestId", closed.testId());
ex.getResponseHeaders().set("X-Pjacoco-ClassCount", String.valueOf(store.classCount()));
ex.getResponseHeaders().set("X-Pjacoco-RecordedProbes",
String.valueOf(writer.countRecordedProbes(store)));
ex.getResponseHeaders().set("X-Pjacoco-DroppedProbes", String.valueOf(store.droppedProbes()));
ex.getResponseHeaders().set("X-Pjacoco-Persisted", String.valueOf(persist));
}

private void handleTraceMap(HttpExchange ex) throws IOException {
Map<String, String> q = query(ex);
String traceId = q.get("traceId");
Expand All @@ -71,6 +136,13 @@ private void handleTraceMap(HttpExchange ex) throws IOException {
respond(ex, 200, "mapped " + traceId + " -> " + testId);
}

private static boolean parseBoolean(String value, boolean defaultValue) {
if (value == null) {
return defaultValue;
}
return Boolean.parseBoolean(value);
}

private static Map<String, String> query(HttpExchange ex) {
Map<String, String> m = new HashMap<String, String>();
String raw = ex.getRequestURI().getRawQuery();
Expand All @@ -89,9 +161,10 @@ private static Map<String, String> query(HttpExchange ex) {

private static void respond(HttpExchange ex, int code, String body) throws IOException {
byte[] b = body.getBytes("UTF-8");
ex.getResponseHeaders().set("Content-Type", "text/plain; charset=utf-8");
ex.sendResponseHeaders(code, b.length);
OutputStream os = ex.getResponseBody();
os.write(b);
ex.close();
os.close();
}
}
101 changes: 75 additions & 26 deletions agent/src/main/java/io/pjacoco/agent/output/ExecWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.pjacoco.agent.store.ClassProbes;
import io.pjacoco.agent.store.TestStore;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -23,58 +24,106 @@ public ExecWriter(double incompleteAttributionThreshold) {
this.incompleteAttributionThreshold = incompleteAttributionThreshold;
}

/** @param status "complete" for a normal stop, "partial" for a shutdown-forced dump. */
public void write(Path dir, TestStore store, String result, String commitSha,
long stoppedAtMillis, String status) throws Exception {
Files.createDirectories(dir);
Map<Long, ClassProbes> snap = store.snapshot();
/** Serialize store to JaCoCo exec bytes in memory. Does NOT touch disk. */
public byte[] toExecBytes(TestStore store, long stoppedAtMillis) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writeExecToStream(baos, store, store.snapshot(), stoppedAtMillis);
return baos.toByteArray();
}

/** Write .exec file only (existing logic, call {@link #toExecBytes} internally). */
public void writeExecFile(Path dir, TestStore store, long stoppedAtMillis) throws Exception {
Files.createDirectories(dir);
Path exec = dir.resolve(store.testId() + ".exec");
long recordedProbes = 0; // count of probe hits captured for this test (ratio denom)
OutputStream os = new BufferedOutputStream(Files.newOutputStream(exec));
try {
ExecutionDataWriter w = new ExecutionDataWriter(os);
w.visitSessionInfo(new SessionInfo(store.testId(),
store.startedAtMillis(), stoppedAtMillis));
for (Map.Entry<Long, ClassProbes> e : snap.entrySet()) {
boolean[] probes = e.getValue().probes();
for (boolean hit : probes) { if (hit) recordedProbes++; }
w.visitClassExecution(new ExecutionData(e.getKey(), e.getValue().className(), probes));
}
writeExecToStream(os, store, store.snapshot(), stoppedAtMillis);
} finally {
os.close();
}
}

/** Write .json sidecar only (Priority 3 may call this separately). */
public void writeSidecarFile(Path dir, TestStore store, String result, String commitSha,
long stoppedAtMillis, String status) throws Exception {
Files.createDirectories(dir);
Map<Long, ClassProbes> snap = store.snapshot();
long recordedProbes = countRecordedProbes(snap);
String json = buildSidecarJson(store, result, stoppedAtMillis, status, recordedProbes).toString();
Files.write(dir.resolve(store.testId() + ".json"), json.getBytes("UTF-8"));
}

/** @param status "complete" for a normal stop, "partial" for a shutdown-forced dump. */
public void write(Path dir, TestStore store, String result, String commitSha,
long stoppedAtMillis, String status) throws Exception {
writeExecFile(dir, store, stoppedAtMillis);
writeSidecarFile(dir, store, result, commitSha, stoppedAtMillis, status);
}

/** Convenience overload used by a normal stop. */
public void write(Path dir, TestStore store, String result, String commitSha,
long stoppedAtMillis) throws Exception {
write(dir, store, result, commitSha, stoppedAtMillis, "complete");
}

public long countRecordedProbes(TestStore store) {
return countRecordedProbes(store.snapshot());
}

private long writeExecToStream(OutputStream os, TestStore store, Map<Long, ClassProbes> snap,
long stoppedAtMillis) throws Exception {
ExecutionDataWriter w = new ExecutionDataWriter(os);
w.visitSessionInfo(new SessionInfo(store.testId(), store.startedAtMillis(), stoppedAtMillis));
long recordedProbes = 0;
for (Map.Entry<Long, ClassProbes> e : snap.entrySet()) {
boolean[] probes = e.getValue().probes();
for (boolean hit : probes) {
if (hit) {
recordedProbes++;
}
}
w.visitClassExecution(new ExecutionData(e.getKey(), e.getValue().className(), probes));
}
return recordedProbes;
}

private Json buildSidecarJson(TestStore store, String result, long stoppedAtMillis, String status,
long recordedProbes) {
Json j = new Json()
.put("testId", store.testId())
.put("exec", store.testId() + ".exec")
.put("precision", "line")
.put("startedAtMillis", store.startedAtMillis())
.put("stoppedAtMillis", stoppedAtMillis)
.put("durationMs", stoppedAtMillis - store.startedAtMillis())
.put("result", result) // null -> omitted
.put("result", result)
.put("classCount", store.classCount())
.put("retryCount", store.retryCount())
.put("shardId", store.shardId()) // null -> omitted
.put("shardId", store.shardId())
.put("status", status);
long dropped = store.droppedProbes();
if (dropped > 0) { // CLS-REQ-008/009: additive, only when loss attributed
if (dropped > 0) {
long denom = dropped + recordedProbes;
double ratio = denom == 0 ? 0.0 : (double) dropped / (double) denom;
j.put("droppedProbes", dropped) // always exposed for visibility (CLS-REQ-009)
j.put("droppedProbes", dropped)
.put("recordedProbes", recordedProbes);
if (ratio > incompleteAttributionThreshold) { // default 0.0 -> any drop flags (backward-compatible)
if (ratio > incompleteAttributionThreshold) {
j.put("incompleteAttribution", true)
.put("attribution", "exact"); // ambiguous (parallel) drops are no longer per-test (CLS-REQ-005)
.put("attribution", "exact");
}
}
String json = j.toString();
Files.write(dir.resolve(store.testId() + ".json"), json.getBytes("UTF-8"));
return j;
}

/** Convenience overload used by a normal stop. */
public void write(Path dir, TestStore store, String result, String commitSha,
long stoppedAtMillis) throws Exception {
write(dir, store, result, commitSha, stoppedAtMillis, "complete");
private static long countRecordedProbes(Map<Long, ClassProbes> snap) {
long recordedProbes = 0;
for (Map.Entry<Long, ClassProbes> e : snap.entrySet()) {
for (boolean hit : e.getValue().probes()) {
if (hit) {
recordedProbes++;
}
}
}
return recordedProbes;
}
}
61 changes: 58 additions & 3 deletions agent/src/main/java/io/pjacoco/agent/store/TestStoreRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,67 @@ public synchronized void discard(String testId) {
}

public synchronized void stop(String testId, String result) {
TestStore s = stores.remove(testId);
if (s == null) {
StopResult closed = closeForStop(testId, result);
if (closed == null) {
log.warn("stop-missing", "stop for unknown testId=" + testId);
return;
}
flush(s, result, "complete");
persistClosed(closed, true, true);
markStopCompleted();
}

/** Close store and return snapshot for serialization. Removes from registry. */
public synchronized StopResult closeForStop(String testId, String result) {
TestStore store = stores.remove(testId);
if (store == null) {
return null;
}
boolean wasEmpty = store.classCount() == 0 && store.droppedProbes() == 0;
return new StopResult(store, testId, result, wasEmpty);
}

/** Optional disk persist after close (exec + sidecar if configured). */
public synchronized void persistClosed(StopResult closed, boolean writeExec, boolean writeSidecar) {
if (closed.wasEmpty()) {
return;
}
long stoppedAt = clock.getAsLong();
try {
if (writeExec) {
writer.writeExecFile(outputDir, closed.snapshot(), stoppedAt);
}
if (writeSidecar) {
writer.writeSidecarFile(outputDir, closed.snapshot(), closed.result(), commitSha,
stoppedAt, "complete");
}
} catch (Exception e) {
metrics.swallowedExceptions.incrementAndGet();
log.warn("flush-error", "persist failed testId=" + closed.testId() + ": " + e);
}
}

/** Immutable snapshot returned when a store is closed for stop. */
public static final class StopResult {
private final TestStore snapshot;
private final String testId;
private final String result;
private final boolean wasEmpty;

public StopResult(TestStore snapshot, String testId, String result, boolean wasEmpty) {
this.snapshot = snapshot;
this.testId = testId;
this.result = result;
this.wasEmpty = wasEmpty;
}

public TestStore snapshot() { return snapshot; }
public String testId() { return testId; }
public String result() { return result; }
public boolean wasEmpty() { return wasEmpty; }
}

/** Increment tests-completed counter after a control-plane stop (binary or text). */
public void markStopCompleted() {
metrics.testsCompleted.incrementAndGet();
}

Expand Down
Loading
Loading