Skip to content

Commit 336d557

Browse files
author
dayan1852
committed
# 9755
Fix logs Basic issue
1 parent 1a5c2eb commit 336d557

File tree

4 files changed

+318
-13
lines changed

4 files changed

+318
-13
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.server.rest;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class HttpBasic {
24+
public String basicUser;
25+
public String basicPass;
26+
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,83 @@ public String getLogPath() {
4747
}
4848

4949
protected String sendGet(String urlString) {
50+
HttpURLConnection connection = null;
5051
try {
51-
HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection();
52+
connection = (HttpURLConnection) new URL(urlString).openConnection();
5253
connection.setRequestMethod("GET");
5354
connection.setConnectTimeout(5000);
5455
connection.setReadTimeout(5000);
5556
connection.connect();
5657

58+
int code = connection.getResponseCode();
5759
if (connection.getResponseCode() == 200) {
58-
try (InputStream is = connection.getInputStream();
59-
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
60-
byte[] buffer = new byte[1024];
61-
int len;
62-
while ((len = is.read(buffer)) != -1) {
63-
baos.write(buffer, 0, len);
64-
}
65-
return baos.toString();
66-
}
60+
return readAll(connection.getInputStream());
6761
}
62+
log.warn("GET {} -> HTTP {}", urlString, code);
6863
} catch (IOException e) {
69-
log.error("Send get Fail.{}", ExceptionUtils.getMessage(e));
64+
log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e));
65+
} finally {
66+
if (connection != null) {
67+
connection.disconnect();
68+
}
69+
}
70+
return null;
71+
}
72+
73+
/**
74+
* Send GET request with Basic Auth
75+
*
76+
* @param urlString url
77+
* @param user name
78+
* @param pass password
79+
* @return log
80+
*/
81+
protected String sendGet(String urlString, String user, String pass) {
82+
HttpURLConnection connection = null;
83+
try {
84+
connection = (HttpURLConnection) new URL(urlString).openConnection();
85+
connection.setRequestMethod("GET");
86+
connection.setConnectTimeout(5000);
87+
connection.setReadTimeout(5000);
88+
89+
// Basic Auth
90+
if (user != null && pass != null) {
91+
String token = java.util.Base64.getEncoder()
92+
.encodeToString((user + ":" + pass).getBytes(java.nio.charset.StandardCharsets.UTF_8));
93+
connection.setRequestProperty("Authorization", "Basic " + token);
94+
}
95+
96+
connection.connect();
97+
98+
int code = connection.getResponseCode();
99+
if (connection.getResponseCode() == 200) {
100+
return readAll(connection.getInputStream());
101+
}
102+
log.warn("GET {} -> HTTP {}", urlString, code);
103+
} catch (IOException e) {
104+
log.error("Send GET failed: url={}, err={}", urlString, ExceptionUtils.getMessage(e));
105+
} finally {
106+
if (connection != null) {
107+
connection.disconnect();
108+
}
70109
}
71110
return null;
72111
}
73112

113+
private String readAll(InputStream is) throws IOException {
114+
if (is == null) {
115+
return null;
116+
}
117+
try (InputStream in = is; ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
118+
byte[] buf = new byte[4096];
119+
int len;
120+
while ((len = in.read(buf)) != -1) {
121+
baos.write(buf, 0, len);
122+
}
123+
return baos.toString(java.nio.charset.StandardCharsets.UTF_8.name());
124+
}
125+
}
126+
74127
public String getLogParam(String uri, String contextPath) {
75128
uri = uri.substring(uri.indexOf(contextPath) + contextPath.length());
76129
uri = StringUtil.stripTrailingSlash(uri).substring(1);

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
6262
String contextPath = httpConfig.getContextPath();
6363
int port = httpConfig.getPort();
6464

65+
// Take BasicAuth from configuration (if enabled)
66+
HttpBasic result = getHttpBasicAuth(httpConfig);
67+
6568
List<Tuple3<String, String, String>> allLogNameList = new ArrayList<>();
6669

6770
JsonArray systemMonitoringInformationJsonValues =
@@ -70,8 +73,20 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
7073
systemMonitoringInformation -> {
7174
String host = systemMonitoringInformation.asObject().get("host").asString();
7275
String url = "http://" + host + ":" + port + contextPath;
73-
String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME);
74-
log.debug(String.format("Request: %s , Result: %s", url, allName));
76+
String logUrl = url + REST_URL_GET_ALL_LOG_NAME;
77+
78+
String allName = httpConfig.isEnableBasicAuth()
79+
? sendGet(logUrl, result.basicUser, result.basicPass)
80+
: sendGet(logUrl);
81+
82+
if (allName == null || allName.trim().isEmpty()) {
83+
log.warn("GET {} returned empty body (null/empty). Skip this node.", logUrl);
84+
return;
85+
}
86+
87+
if (log.isDebugEnabled()) {
88+
log.debug("Request: {} , Result: {}", url, allName);
89+
}
7590
ArrayNode jsonNodes = JsonUtils.parseArray(allName);
7691

7792
jsonNodes.forEach(
@@ -91,6 +106,39 @@ public List<Tuple3<String, String, String>> allLogNameList(String jobId) {
91106
return allLogNameList;
92107
}
93108

109+
private static HttpBasic getHttpBasicAuth(HttpConfig httpConfig) {
110+
String basicUser = "";
111+
String basicPass = "";
112+
try {
113+
if (httpConfig.isEnableBasicAuth()) {
114+
basicUser = httpConfig.getBasicAuthUsername();
115+
basicPass = httpConfig.getBasicAuthPassword();
116+
}
117+
} catch (Throwable ignore) {
118+
// Compatible with older versions: If HttpConfig does not have these methods, use system properties or environment variables to find out
119+
basicUser = System.getProperty("seatunnel.http.user", System.getenv("BASIC_AUTH_USER"));
120+
basicPass = System.getProperty("seatunnel.http.pass", System.getenv("BASIC_AUTH_PASS"));
121+
log.warn("Use system property or environment variable to set basic auth.");
122+
}
123+
124+
if (StringUtils.isNotBlank(basicUser) && StringUtils.isNotBlank(basicPass)) {
125+
httpConfig.setBasicAuthUsername(basicUser);
126+
httpConfig.setBasicAuthPassword(basicPass);
127+
}
128+
return new HttpBasic(basicUser, basicPass);
129+
}
130+
131+
private static class HttpBasic {
132+
133+
public final String basicUser;
134+
public final String basicPass;
135+
136+
public HttpBasic(String basicUser, String basicPass) {
137+
this.basicUser = basicUser;
138+
this.basicPass = basicPass;
139+
}
140+
}
141+
94142
public JsonArray allNodeLogFormatJson(String jobId) {
95143

96144
return allLogNameList(jobId).stream()
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.server.rest;
19+
20+
import java.io.BufferedReader;
21+
import java.io.IOException;
22+
import java.io.InputStreamReader;
23+
import java.net.HttpURLConnection;
24+
import java.util.Collections;
25+
import java.util.stream.Collectors;
26+
27+
import com.hazelcast.config.Config;
28+
import com.hazelcast.internal.serialization.Data;
29+
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
30+
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
31+
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
32+
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
33+
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
34+
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
35+
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
36+
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
37+
import org.apache.seatunnel.engine.server.SeaTunnelServer;
38+
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
39+
import org.apache.seatunnel.engine.server.TestUtils;
40+
import org.junit.jupiter.api.Assertions;
41+
import org.junit.jupiter.api.BeforeAll;
42+
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.api.condition.DisabledOnOs;
44+
import org.junit.jupiter.api.condition.OS;
45+
46+
/** Test for Rest API with Basic. */
47+
@DisabledOnOs(OS.MAC)
48+
public class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest {
49+
50+
private static final int HTTP_PORT = 18080;
51+
private static final Long JOB_1 = System.currentTimeMillis() + 1L;
52+
public static final String USER = "admin";
53+
public static final String PASS = "admin";
54+
55+
@BeforeAll
56+
void setUp() {
57+
String name = this.getClass().getName();
58+
Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
59+
hazelcastConfig.setClusterName(TestUtils.getClusterName("RestApiServletHttpBasicTest_" + name));
60+
SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
61+
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
62+
seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
63+
64+
HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig();
65+
httpConfig.setEnabled(true);
66+
httpConfig.setPort(HTTP_PORT);
67+
68+
instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
69+
nodeEngine = instance.node.nodeEngine;
70+
server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
71+
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
72+
}
73+
74+
@Test
75+
public void testRestApiHttp() throws Exception {
76+
HttpURLConnection conn =
77+
(HttpURLConnection)
78+
new java.net.URL("http://localhost:" + HTTP_PORT + "/overview")
79+
.openConnection();
80+
setBasicAuth(conn);
81+
try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
82+
83+
Assertions.assertEquals(200, conn.getResponseCode());
84+
String response = in.lines().collect(Collectors.joining());
85+
Assertions.assertTrue(response.contains("projectVersion"));
86+
} finally {
87+
conn.disconnect();
88+
}
89+
}
90+
91+
@Override
92+
public SeaTunnelConfig loadSeaTunnelConfig() {
93+
SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig();
94+
95+
HttpConfig httpConfig = seaTunnelConfig.getEngineConfig().getHttpConfig();
96+
httpConfig.setEnabled(Boolean.TRUE);
97+
httpConfig.setEnableBasicAuth(Boolean.TRUE);
98+
httpConfig.setBasicAuthUsername("admin");
99+
httpConfig.setBasicAuthPassword("admin");
100+
httpConfig.setPort(HTTP_PORT);
101+
102+
CheckpointConfig checkpointConfig = seaTunnelConfig.getEngineConfig().getCheckpointConfig();
103+
104+
checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE);
105+
seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig);
106+
return seaTunnelConfig;
107+
}
108+
109+
@Test
110+
void testWriteJsonWithObject() throws IOException {
111+
startJob();
112+
testLogRestApiResponse("JSON");
113+
}
114+
115+
public void setBasicAuth(HttpURLConnection connection){
116+
// Basic Auth
117+
String token = java.util.Base64.getEncoder()
118+
.encodeToString((USER + ":" + PASS).getBytes(java.nio.charset.StandardCharsets.UTF_8));
119+
connection.setRequestProperty("Authorization", "Basic " + token);
120+
}
121+
122+
public void testLogRestApiResponse(String format) throws IOException {
123+
HttpURLConnection conn = null;
124+
try {
125+
java.net.URL url =
126+
new java.net.URL("http://localhost:" + HTTP_PORT + "/logs?format=" + format);
127+
conn = (HttpURLConnection) url.openConnection();
128+
setBasicAuth(conn);
129+
130+
try (BufferedReader in =
131+
new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
132+
// [ {
133+
// "node" : "localhost:18080",
134+
// "logLink" : "http://localhost:18080/logs/job-1760939539658.log",
135+
// "logName" : "job-1760939539658.log"
136+
//}, {
137+
// "node" : "localhost:18080",
138+
// "logLink" : "http://localhost:18080/logs/job-${ctx:ST-JID}.log",
139+
// "logName" : "job-${ctx:ST-JID}.log"
140+
//} ]
141+
String response = in.lines().collect(Collectors.joining());
142+
Assertions.assertNotNull(response);
143+
}
144+
145+
Assertions.assertEquals(200, conn.getResponseCode());
146+
Assertions.assertTrue(
147+
conn.getHeaderFields()
148+
.get("Content-Type")
149+
.toString()
150+
.contains("charset=utf-8"));
151+
} finally {
152+
if (conn != null) {
153+
conn.disconnect();
154+
}
155+
}
156+
}
157+
158+
private void startJob() {
159+
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan("fake_to_console.conf", RestApiHttpBasicTest.JOB_1.toString(),
160+
RestApiHttpBasicTest.JOB_1);
161+
162+
JobImmutableInformation jobImmutableInformation =
163+
new JobImmutableInformation(
164+
RestApiHttpBasicTest.JOB_1,
165+
"Test",
166+
nodeEngine.getSerializationService(),
167+
testLogicalDag,
168+
Collections.emptyList(),
169+
Collections.emptyList());
170+
171+
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
172+
173+
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
174+
server.getCoordinatorService()
175+
.submitJob(RestApiHttpBasicTest.JOB_1, data, jobImmutableInformation.isStartWithSavePoint());
176+
voidPassiveCompletableFuture.join();
177+
}
178+
}

0 commit comments

Comments
 (0)