Skip to content

Commit 633feb7

Browse files
authored
repo-sync-2025-08-25T14:15:37+0800 (#49)
* repo-sync-2025-08-25T14:15:37+0800 * repo-sync-2025-08-25T14:15:37+0800 revert * repo-sync-2025-08-25T14:15:37+0800
1 parent 9101524 commit 633feb7

File tree

38 files changed

+2241
-480
lines changed

38 files changed

+2241
-480
lines changed

build/Dockerfiles/dataproxy.Dockerfile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@ RUN ln -s ${JAVA_HOME}/lib/libjli.so /lib64
2323
COPY dataproxy-server/target/dataproxy-server-0.0.1-SNAPSHOT.jar dataproxy.jar
2424
COPY libs/*.jar libs/
2525

26-
ENV JAVA_OPTS="-server -XX:+UseG1GC -XX:+DisableExplicitGC -XX:InitiatingHeapOccupancyPercent=68 -Xlog:gc*=info:file=gc.log:time,tags:filecount=5,filesize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/app/log -XX:ErrorFile=/app/log/hs_err_pid%p.log"
26+
# At present, when the concurrency of the task is not high,
27+
# the heap memory consumption is not large, the maximum setting is 512M,
28+
# if the resources are sufficient, and the concurrency can be appropriately increased,
29+
# but the MaxDirectMemorySize needs to be increased according to the situation,
30+
# the current default setting is 1536m,
31+
# You can override this configuration by adding environment variables
32+
ENV JAVA_OPTS="-server -XX:+UseG1GC -XX:+UseContainerSupport -Xms256m -Xmx512m -XX:MaxDirectMemorySize=1536m -XX:+DisableExplicitGC -XX:InitiatingHeapOccupancyPercent=68 -Xlog:gc*=info:file=gc.log:time,tags:filecount=5,filesize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/app/log -XX:ErrorFile=/app/log/hs_err_pid%p.log"
2733
ENV LOG_LEVEL=INFO
2834
EXPOSE 8023
2935
ENTRYPOINT ${JAVA_HOME}/bin/java ${JAVA_OPTS} -Dsun.net.http.allowRestrictedHeaders=true --add-opens=java.base/java.nio=ALL-UNNAMED -jar ./dataproxy.jar

dataproxy-core/pom.xml

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
<groupId>org.apache.arrow</groupId>
3434
<artifactId>arrow-format</artifactId>
3535
</exclusion>
36-
<exclusion>
36+
<!--<exclusion>
3737
<groupId>org.apache.arrow</groupId>
3838
<artifactId>arrow-memory-core</artifactId>
39-
</exclusion>
39+
</exclusion>-->
4040
<exclusion>
4141
<groupId>org.apache.arrow</groupId>
4242
<artifactId>arrow-memory</artifactId>
@@ -122,6 +122,12 @@
122122
</exclusions>
123123
</dependency>
124124

125+
<dependency>
126+
<groupId>org.apache.arrow</groupId>
127+
<artifactId>arrow-memory-netty</artifactId>
128+
<scope>test</scope>
129+
</dependency>
130+
125131
<dependency>
126132
<groupId>com.github.ben-manes.caffeine</groupId>
127133
<artifactId>caffeine</artifactId>
@@ -146,5 +152,28 @@
146152
<artifactId>jsr305</artifactId>
147153
</dependency>
148154

155+
<!-- JUnit 5 -->
156+
<dependency>
157+
<groupId>org.junit.jupiter</groupId>
158+
<artifactId>junit-jupiter-api</artifactId>
159+
<scope>test</scope>
160+
</dependency>
161+
<dependency>
162+
<groupId>org.junit.jupiter</groupId>
163+
<artifactId>junit-jupiter-engine</artifactId>
164+
<scope>test</scope>
165+
</dependency>
166+
<dependency>
167+
<groupId>org.mockito</groupId>
168+
<artifactId>mockito-inline</artifactId>
169+
<scope>test</scope>
170+
</dependency>
171+
172+
<!-- Mockito for JUnit 5 -->
173+
<dependency>
174+
<groupId>org.mockito</groupId>
175+
<artifactId>mockito-junit-jupiter</artifactId>
176+
<scope>test</scope>
177+
</dependency>
149178
</dependencies>
150179
</project>

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/config/DefaultFlightServerConfigLoader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,6 @@ public void loadProperties(Properties properties) {
6161
throw new RuntimeException(e);
6262
}
6363
properties.put(FlightServerConfigKey.PORT, 8023);
64+
properties.put(FlightServerConfigKey.METRICS_PORT, 9101);
6465
}
6566
}

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/config/FlightServerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import lombok.extern.slf4j.Slf4j;
2020
import org.apache.arrow.flight.Location;
21+
import org.apache.arrow.memory.BufferAllocator;
22+
import org.apache.arrow.memory.RootAllocator;
23+
import org.secretflow.dataproxy.core.listener.DataProxyAllocationListener;
2124

2225
/**
2326
* @author yuexie
@@ -26,7 +29,14 @@
2629
@Slf4j
2730
public record FlightServerConfig(String host, int port) {
2831

32+
private static final BufferAllocator ROOT_ALLOCATOR =
33+
new RootAllocator(new DataProxyAllocationListener(), 2L * 1024 * 1024 * 1024);
34+
2935
public Location getLocation() {
3036
return Location.forGrpcInsecure(host, port);
3137
}
38+
39+
public BufferAllocator getBufferAllocator() {
40+
return ROOT_ALLOCATOR;
41+
}
3242
}

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/config/FlightServerConfigKey.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ public class FlightServerConfigKey {
2626

2727
public static final String PORT = "SERVICE_PORT";
2828

29+
public static final String METRICS_PORT = "METRICS_PORT";
30+
2931
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.secretflow.dataproxy.core.listener;
18+
19+
import lombok.extern.slf4j.Slf4j;
20+
import org.apache.arrow.memory.AllocationListener;
21+
import org.apache.arrow.memory.AllocationOutcome;
22+
import org.apache.arrow.memory.BufferAllocator;
23+
24+
/**
25+
* @author yuexie
26+
* @date 2025/4/14 16:14
27+
**/
28+
@Slf4j
29+
public class DataProxyAllocationListener implements AllocationListener {
30+
31+
/**
32+
* Called each time a new buffer has been requested.
33+
*
34+
* <p>An exception can be safely thrown by this method to terminate the allocation.
35+
*
36+
* @param size the buffer size being allocated
37+
*/
38+
@Override
39+
public void onPreAllocation(long size) {
40+
AllocationListener.super.onPreAllocation(size);
41+
log.debug("onPreAllocation, size: {}", size);
42+
}
43+
44+
/**
45+
* Called each time a new buffer has been allocated.
46+
*
47+
* <p>An exception cannot be thrown by this method.
48+
*
49+
* @param size the buffer size being allocated
50+
*/
51+
@Override
52+
public void onAllocation(long size) {
53+
AllocationListener.super.onAllocation(size);
54+
log.debug("onAllocation, size: {}", size);
55+
}
56+
57+
/**
58+
* Informed each time a buffer is released from allocation.
59+
*
60+
* <p>An exception cannot be thrown by this method.
61+
*
62+
* @param size The size of the buffer being released.
63+
*/
64+
@Override
65+
public void onRelease(long size) {
66+
AllocationListener.super.onRelease(size);
67+
}
68+
69+
/**
70+
* Called whenever an allocation failed, giving the caller a chance to create some space in the
71+
* allocator (either by freeing some resource, or by changing the limit), and, if successful,
72+
* allowing the allocator to retry the allocation.
73+
*
74+
* @param size the buffer size that was being allocated
75+
* @param outcome the outcome of the failed allocation. Carries information of what failed
76+
* @return true, if the allocation can be retried; false if the allocation should fail
77+
*/
78+
@Override
79+
public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
80+
log.debug("onFailedAllocation, size: {}, outcome: {}", size, outcome);
81+
return AllocationListener.super.onFailedAllocation(size, outcome);
82+
}
83+
84+
/**
85+
* Called immediately after a child allocator was added to the parent allocator.
86+
*
87+
* @param parentAllocator The parent allocator to which a child was added
88+
* @param childAllocator The child allocator that was just added
89+
*/
90+
@Override
91+
public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
92+
AllocationListener.super.onChildAdded(parentAllocator, childAllocator);
93+
log.debug("onChildAdded, childAllocator: {}, size: {}", childAllocator.getName(), childAllocator.getLimit());
94+
}
95+
96+
/**
97+
* Called immediately after a child allocator was removed from the parent allocator.
98+
*
99+
* @param parentAllocator The parent allocator from which a child was removed
100+
* @param childAllocator The child allocator that was just removed
101+
*/
102+
@Override
103+
public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
104+
AllocationListener.super.onChildRemoved(parentAllocator, childAllocator);
105+
log.debug("onChildRemoved, childAllocator: {}, size: {}", childAllocator.getName(), childAllocator.getLimit());
106+
}
107+
}

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/reader/AbstractSender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ public void send() {
8181
ValueVectorUtility.ensureCapacity(root, takeRecordCount + 1);
8282
this.toArrowVector(record, root, takeRecordCount);
8383
takeRecordCount++;
84-
85-
if (takeRecordCount % 300_000 == 0) {
84+
// 10w records, flush to arrow
85+
// It can't be too big, and the off-heap memory is clipped
86+
if (takeRecordCount % 100_000 == 0) {
8687
break;
8788
}
8889
}
@@ -158,7 +159,7 @@ public void close() throws Exception {
158159
* Pre-application for arrow vector memory
159160
*/
160161
private void preAllocate() {
161-
162+
root.clear();
162163
ValueVectorUtility.preAllocate(root, estimatedRecordCount);
163164

164165
root.getFieldVectors().forEach(fieldVector -> {
@@ -168,6 +169,5 @@ private void preAllocate() {
168169
baseVariableWidthVector.allocateNew(estimatedRecordCount * 32);
169170
}
170171
});
171-
root.clear();
172172
}
173173
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.secretflow.dataproxy.core.config;
17+
18+
import org.junit.jupiter.api.Test;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
28+
import static org.junit.jupiter.api.Assertions.assertSame;
29+
30+
/**
31+
* @author yuexie
32+
* @date 2025/06/16 10:59:10
33+
*/
34+
public class FlightServerContextTest {
35+
36+
@Test
37+
void testGetInstanceInMultiThread() throws InterruptedException {
38+
final int threadCount = 10;
39+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
40+
CountDownLatch latch = new CountDownLatch(threadCount);
41+
AtomicReference<FlightServerContext> firstInstance = new AtomicReference<>();
42+
43+
for (int i = 0; i < threadCount; i++) {
44+
executor.submit(() -> {
45+
FlightServerContext instance = FlightServerContext.getInstance();
46+
if (firstInstance.get() == null) {
47+
firstInstance.set(instance);
48+
} else {
49+
assertSame(firstInstance.get(), instance, "The same instance should be returned in a multi-threaded environment");
50+
}
51+
latch.countDown();
52+
});
53+
}
54+
55+
latch.await(5, TimeUnit.SECONDS);
56+
executor.shutdown();
57+
}
58+
@Test
59+
void testGetOrDefault() {
60+
String testKey = "nonexistent.key";
61+
String defaultValue = "default";
62+
63+
String result = FlightServerContext.getOrDefault(testKey, String.class, defaultValue);
64+
assertEquals(defaultValue, result);
65+
}
66+
67+
@Test
68+
void testFlightServerConfig() {
69+
FlightServerContext context = FlightServerContext.getInstance();
70+
assertNotNull(context.getFlightServerConfig(), "flightServerConfig should be initialized");
71+
}
72+
73+
}

0 commit comments

Comments
 (0)