Skip to content

Commit 19095ef

Browse files
committed
Fix timing issues, add logging, other possible fixes
1 parent 05cb53a commit 19095ef

File tree

6 files changed

+156
-16
lines changed

6 files changed

+156
-16
lines changed

build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ dependencies {
3333
}
3434
}
3535
}
36-
api("com.squareup.okhttp3:okhttp") {
36+
api(libs.okhttp) {
3737
version {
3838
strictly(libs.versions.okhttpVersionrange.get())
3939
prefer(libs.versions.okhttp.get())
@@ -83,6 +83,7 @@ dependencies {
8383
implementation(libs.junixsocketCommon)
8484
implementation(libs.bundles.jna)
8585

86+
testImplementation(libs.okhttpLoggingInterceptor)
8687
testImplementation("org.junit.jupiter:junit-jupiter-api:5.13.4")
8788
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.13.4")
8889
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.13.4")

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ kotlinJdk7 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk7", version.ref =
2525
kotlinJdk8 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk8", version.ref = "kotlin" }
2626
logback = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
2727
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
28+
okhttpLoggingInterceptor = { module = "com.squareup.okhttp3:logging-interceptor", version.ref = "okhttp" }
2829
okio = { module = "com.squareup.okio:okio", version.ref = "okio" }
2930
okioJvm = { module = "com.squareup.okio:okio-jvm", version.ref = "okio" }
3031
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
@@ -33,4 +34,5 @@ slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
3334
kotlin = ["kotlin", "kotlinCommon", "kotlinJdk7", "kotlinJdk8"]
3435
jna = ["jna", "jnaPlatform"]
3536
junixsocket = ["junixsocketCore", "junixsocketCommon"]
37+
okhttp = ["okhttp", "okhttpLoggingInterceptor"]
3638
okio = ["okio", "okioJvm"]

src/main/java/de/gesellix/docker/client/filesocket/NamedPipeSocket.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import static com.sun.jna.platform.win32.WinNT.GENERIC_WRITE;
77
import static com.sun.jna.platform.win32.WinNT.OPEN_EXISTING;
88

9-
import java.io.FilterOutputStream;
109
import java.io.IOException;
1110
import java.io.InputStream;
1211
import java.io.OutputStream;
@@ -55,7 +54,13 @@ void connect(String socketPath) throws IOException {
5554
socketPath = socketPath.replace("/", "\\");
5655
log.debug("connect via '{}'...", socketPath);
5756

58-
Kernel32.INSTANCE.WaitNamedPipe(socketPath, 200);
57+
boolean ok = Kernel32.INSTANCE.WaitNamedPipe(socketPath, 200);
58+
if (!ok) {
59+
int err = Kernel32.INSTANCE.GetLastError();
60+
log.error("Failed to wait for Named Pipe '" + socketPath + "', WinError=" + err);
61+
throw new IOException("Failed to wait for Named Pipe '" + socketPath + "', WinError=" + err);
62+
}
63+
5964
handle = Kernel32.INSTANCE.CreateFile(
6065
socketPath,
6166
GENERIC_READ | GENERIC_WRITE,
@@ -69,6 +74,7 @@ void connect(String socketPath) throws IOException {
6974

7075
if (INVALID_HANDLE_VALUE.equals(handle)) {
7176
int err = Kernel32.INSTANCE.GetLastError();
77+
log.error("Failed to open Named Pipe '" + socketPath + "', WinError=" + err);
7278
throw new IOException("Failed to open Named Pipe '" + socketPath + "', WinError=" + err);
7379
}
7480

@@ -86,32 +92,27 @@ public InputStream getInputStream() throws IOException {
8692
@Override
8793
public OutputStream getOutputStream() throws IOException {
8894
ensureOpen();
89-
90-
// Wrap the Okio OutputStream so flush() also flushes the BufferedSink
91-
return new FilterOutputStream(sink.outputStream()) {
92-
@Override
93-
public void flush() throws IOException {
94-
super.flush(); // flush Java's wrapper
95-
sink.flush(); // flush Okio buffer to NamedPipeSink
96-
}
97-
};
95+
return sink.outputStream();
9896
}
9997

10098
@Override
10199
public synchronized void close() throws IOException {
102100
if (closed) {
103101
return;
104102
}
103+
log.debug("closing handle {}...", handle);
105104
try {
106105
if (handle != null && !INVALID_HANDLE_VALUE.equals(handle)) {
107106
// Cancel any pending read/write before closing to avoid CloseHandle() hang
108107
ExtendedKernel32.INSTANCE.CancelIoEx(handle, null);
109108
}
110109

111110
if (source != null) {
111+
log.debug("closing source {}...", source);
112112
source.close();
113113
}
114114
if (sink != null) {
115+
log.debug("closing sink {}...", sink);
115116
sink.close();
116117
}
117118
} finally {

src/main/java/de/gesellix/docker/client/filesocket/NamedPipeSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ public long read(@NotNull Buffer sink, long byteCount) {
4444
// Expected when CancelIoEx() is called during close()
4545
return -1;
4646
}
47-
return -1; // Other read error
47+
return err == 0 ? 0 : -1; // Other read error
4848
}
4949

50-
if (bytesRead.getValue() <= 0) {
50+
if (bytesRead.getValue() < 0) {
5151
return -1; // EOF
5252
}
5353

src/main/java/de/gesellix/docker/client/filesocket/NamedPipeUtils.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import static com.sun.jna.platform.win32.WinBase.INVALID_HANDLE_VALUE;
44

5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
58
import com.sun.jna.platform.win32.Kernel32;
69
import com.sun.jna.platform.win32.WinBase;
710
import com.sun.jna.platform.win32.WinError;
@@ -11,6 +14,8 @@
1114

1215
public final class NamedPipeUtils {
1316

17+
private static final Logger log = LoggerFactory.getLogger(NamedPipeUtils.class);
18+
1419
private NamedPipeUtils() {
1520
}
1621

@@ -26,6 +31,12 @@ public static boolean readOverlapped(WinNT.HANDLE handle, byte[] buf, IntByRefer
2631
WinBase.OVERLAPPED overlapped = new WinBase.OVERLAPPED();
2732
overlapped.hEvent = Kernel32.INSTANCE.CreateEvent(null, true, false, null);
2833

34+
try {
35+
Thread.sleep(10);
36+
} catch (InterruptedException ignored) {
37+
// ignored
38+
}
39+
2940
boolean ok = Kernel32.INSTANCE.ReadFile(handle, buf, buf.length, null, overlapped);
3041
if (!ok) {
3142
int err = Kernel32.INSTANCE.GetLastError();
@@ -44,13 +55,20 @@ public static boolean readOverlapped(WinNT.HANDLE handle, byte[] buf, IntByRefer
4455

4556
ExtendedKernel32.INSTANCE.GetOverlappedResult(handle, overlapped, bytesRead, false);
4657
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
47-
return bytesRead.getValue() > 0;
58+
log.debug("Bytes read: {}", bytesRead.getValue());
59+
return bytesRead.getValue() >= 0;
4860
}
4961

5062
public static boolean writeOverlapped(WinNT.HANDLE handle, byte[] buf, int len, IntByReference bytesWritten, int timeoutMillis) {
5163
WinBase.OVERLAPPED overlapped = new WinBase.OVERLAPPED();
5264
overlapped.hEvent = Kernel32.INSTANCE.CreateEvent(null, true, false, null);
5365

66+
try {
67+
Thread.sleep(10);
68+
} catch (InterruptedException ignored) {
69+
// ignored
70+
}
71+
5472
boolean ok = Kernel32.INSTANCE.WriteFile(handle, buf, len, null, overlapped);
5573
if (!ok) {
5674
int err = Kernel32.INSTANCE.GetLastError();
@@ -69,7 +87,11 @@ public static boolean writeOverlapped(WinNT.HANDLE handle, byte[] buf, int len,
6987

7088
ExtendedKernel32.INSTANCE.GetOverlappedResult(handle, overlapped, bytesWritten, false);
7189
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
72-
return bytesWritten.getValue() > 0;
90+
log.debug("Bytes written: {}", bytesWritten.getValue());
91+
if (bytesWritten.getValue() != len) {
92+
log.warn("Incomplete write: {}/{}", bytesWritten.getValue(), len);
93+
}
94+
return bytesWritten.getValue() >= 0;
7395
}
7496

7597
public static void closeHandle(HANDLE handle) {

src/test/java/de/gesellix/docker/client/filesocket/NamedPipeSocketTest.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@
3737
import com.sun.jna.platform.win32.WinNT;
3838
import com.sun.jna.ptr.IntByReference;
3939

40+
import okhttp3.HttpUrl;
41+
import okhttp3.OkHttpClient;
42+
import okhttp3.Request;
43+
import okhttp3.Response;
44+
import okhttp3.logging.HttpLoggingInterceptor;
45+
import okhttp3.logging.LoggingEventListener;
46+
import okio.Timeout;
47+
4048
class NamedPipeSocketTest {
4149

4250
@Test
@@ -49,6 +57,111 @@ void canConnect() throws IOException {
4957
}
5058
}
5159

60+
@Test
61+
@EnabledOnOs(OS.WINDOWS)
62+
void canPingViaSocket() throws IOException {
63+
try (NamedPipeSocket namedPipeSocket = new NamedPipeSocket()) {
64+
namedPipeSocket.connect(new InetSocketAddress(
65+
getByAddress(namedPipeSocket.encodeHostname("//./pipe/docker_engine"), new byte[]{0, 0, 0, 0}),
66+
0));
67+
68+
OutputStream os = namedPipeSocket.getOutputStream();
69+
70+
List<String> headers = new ArrayList<>();
71+
headers.add("GET /_ping HTTP/1.1");
72+
headers.add("Content-Type: application/json");
73+
headers.add("Accept: application/json");
74+
// headers.add("Host: localhost");
75+
headers.add("Host: 2f2f2e2f706970652f646f636b65725f656e67696e65.socket");
76+
headers.add("Connection: Keep-Alive");
77+
headers.add("Accept-Encoding: gzip");
78+
headers.add("User-Agent: NamedPipeSocketTest");
79+
80+
try {
81+
for (String line : headers) {
82+
System.out.println(">>|" + line);
83+
os.write(line.getBytes(StandardCharsets.UTF_8));
84+
os.write("\n".getBytes(StandardCharsets.UTF_8));
85+
}
86+
os.write("\n".getBytes(StandardCharsets.UTF_8));
87+
os.flush();
88+
System.out.println("[TEST] Sent request headers");
89+
Thread.sleep(2000);
90+
} catch (IOException e) {
91+
e.printStackTrace();
92+
} catch (InterruptedException e) {
93+
e.printStackTrace();
94+
}
95+
96+
InputStream is = namedPipeSocket.getInputStream();
97+
Thread readerThread = new Thread(() -> {
98+
try {
99+
byte[] buffer = new byte[8024];
100+
int bytesRead;
101+
while ((bytesRead = is.read(buffer)) != -1) {
102+
System.out.println("Read: " + new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
103+
}
104+
Thread.sleep(500);
105+
} catch (Exception e) {
106+
throw new RuntimeException(e);
107+
}
108+
});
109+
readerThread.start();
110+
111+
try {
112+
Thread.sleep(1000);
113+
} catch (InterruptedException e) {
114+
e.printStackTrace();
115+
} finally {
116+
os.close();
117+
}
118+
System.out.println("Closing reader");
119+
if (is != null) {
120+
try {
121+
is.close();
122+
} catch (IOException ignored) {
123+
}
124+
}
125+
}
126+
}
127+
128+
@Test
129+
@EnabledOnOs(OS.WINDOWS)
130+
void canPingViaOkHttp() throws IOException {
131+
NamedPipeSocketFactory namedPipeSocketFactory = new NamedPipeSocketFactory();
132+
OkHttpClient.Builder builder = new OkHttpClient.Builder()
133+
.socketFactory(namedPipeSocketFactory)
134+
.dns(namedPipeSocketFactory)
135+
.connectTimeout(new Timeout().timeout(10, TimeUnit.SECONDS).timeoutNanos() / 1000, TimeUnit.MILLISECONDS)
136+
.callTimeout(new Timeout().timeout(30, TimeUnit.SECONDS).timeoutNanos() / 1000, TimeUnit.MILLISECONDS)
137+
.readTimeout(new Timeout().timeout(30, TimeUnit.SECONDS).timeoutNanos() / 1000, TimeUnit.MILLISECONDS)
138+
.writeTimeout(new Timeout().timeout(30, TimeUnit.SECONDS).timeoutNanos() / 1000, TimeUnit.MILLISECONDS)
139+
.addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY))
140+
.eventListenerFactory(new LoggingEventListener.Factory());
141+
142+
HttpUrl url = new HttpUrl.Builder()
143+
.scheme("http")
144+
.host(new NamedPipeSocket().encodeHostname("//./pipe/docker_engine"))
145+
.addPathSegments("_ping")
146+
.build();
147+
148+
Request request = new Request.Builder()
149+
.get()
150+
.url(url)
151+
.addHeader("Content-Type", "application/json")
152+
.addHeader("Accept", "application/json")
153+
.build();
154+
155+
OkHttpClient client = builder.build();
156+
Response response = client.newCall(request).execute();
157+
if (!response.isSuccessful()) {
158+
response.close();
159+
} else {
160+
String content = response.toString();
161+
System.out.println("Got: " + content);
162+
}
163+
}
164+
52165
@Test
53166
@EnabledOnOs(OS.WINDOWS)
54167
public void testInProcessServerClient() throws Exception {
@@ -253,6 +366,7 @@ void testPlainHijacked() throws Exception {
253366

254367
Thread readerThread = new Thread(() -> {
255368
try (InputStream inputStream = namedPipeSocket.getInputStream()) {
369+
Thread.sleep(1000);
256370
byte[] buf = new byte[1024];
257371
int read;
258372
while ((read = inputStream.read(buf)) != -1) {

0 commit comments

Comments
 (0)