diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index b4d35d52ae35f..df3ce5ee2e700 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -88,6 +88,11 @@ public Builder(Map configs, boolean validateOnly) { public AlterConfigsRequest build(short version) { return new AlterConfigsRequest(data, version); } + + @Override + public String toString() { + return maskData(data); + } } private final AlterConfigsRequestData data; @@ -135,4 +140,20 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static AlterConfigsRequest parse(ByteBuffer buffer, short version) { return new AlterConfigsRequest(new AlterConfigsRequestData(new ByteBufferAccessor(buffer), version), version); } + + // It is not safe to print all config values + private static String maskData(AlterConfigsRequestData data) { + AlterConfigsRequestData tempData = data.duplicate(); + tempData.resources().forEach(resource -> { + resource.configs().forEach(config -> { + config.setValue("REDACTED"); + }); + }); + return tempData.toString(); + } + + @Override + public String toString() { + return maskData(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java index c779f6d9a84da..a51aa7dc40641 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java @@ -17,14 +17,10 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; -import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter; import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -48,7 +44,7 @@ public AlterUserScramCredentialsRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } @@ -87,15 +83,18 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results)); } + private static String maskData(AlterUserScramCredentialsRequestData data) { + AlterUserScramCredentialsRequestData tempData = data.duplicate(); + tempData.upsertions().forEach(upsertion -> { + upsertion.setSalt(new byte[0]); + upsertion.setSaltedPassword(new byte[0]); + }); + return tempData.toString(); + } + // Do not print salt or saltedPassword @Override public String toString() { - JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy(); - - for (JsonNode upsertion : json.get("upsertions")) { - ((ObjectNode) upsertion).put("salt", ""); - ((ObjectNode) upsertion).put("saltedPassword", ""); - } - return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString(); + return maskData(data); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java index 0a9f9a8991bdc..287013d3aadb8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java @@ -103,4 +103,13 @@ public boolean hasError() { public boolean shouldClientThrottle(short version) { return version >= 1; } + + // Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + CreateDelegationTokenResponseData tempData = data.duplicate(); + tempData.setTokenId("REDACTED"); + tempData.setHmac(new byte[0]); + return tempData.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java index a922f056a89aa..87b358249f5d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java @@ -128,4 +128,15 @@ public boolean hasError() { public boolean shouldClientThrottle(short version) { return version >= 1; } + + // Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + DescribeDelegationTokenResponseData tempData = data.duplicate(); + tempData.tokens().forEach(token -> { + token.setTokenId("REDACTED"); + token.setHmac(new byte[0]); + }); + return tempData.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java index 3660a45646059..ab4bcd9533e63 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java @@ -74,7 +74,19 @@ public ExpireDelegationTokenRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } + + private static String maskData(ExpireDelegationTokenRequestData data) { + ExpireDelegationTokenRequestData tempData = data.duplicate(); + tempData.setHmac(new byte[0]); + return tempData.toString(); + } + + // Do not print Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + return maskData(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java index 54540837756c5..bfac0e4074d73 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java @@ -21,15 +21,11 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; @@ -77,7 +73,7 @@ public IncrementalAlterConfigsRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } @@ -113,14 +109,18 @@ public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwab } // It is not safe to print all config values + private static String maskData(IncrementalAlterConfigsRequestData data) { + IncrementalAlterConfigsRequestData tempData = data.duplicate(); + tempData.resources().forEach(resource -> { + resource.configs().forEach(config -> { + config.setValue("REDACTED"); + }); + }); + return tempData.toString(); + } + @Override public String toString() { - JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy(); - for (JsonNode resource : json.get("resources")) { - for (JsonNode config : resource.get("configs")) { - ((ObjectNode) config).put("value", "REDACTED"); - } - } - return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString(); + return maskData(data); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java index 963097093dc2d..deeb0dbf23aa0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java @@ -66,7 +66,19 @@ public RenewDelegationTokenRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } + + private static String maskData(RenewDelegationTokenRequestData data) { + RenewDelegationTokenRequestData tempData = data.duplicate(); + tempData.setHmac(new byte[0]); + return tempData.toString(); + } + + // Do not print Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + return maskData(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java index 47dd5fd315769..0816744a97c3e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java @@ -78,4 +78,12 @@ public static SaslAuthenticateRequest parse(ByteBuffer buffer, short version) { return new SaslAuthenticateRequest(new SaslAuthenticateRequestData(new ByteBufferAccessor(buffer), version), version); } + + // Do not print authBytes, overwrite a temp copy of the data with empty bytes + @Override + public String toString() { + SaslAuthenticateRequestData tempData = data.duplicate(); + tempData.setAuthBytes(new byte[0]); + return tempData.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java index d6ca8c170dc45..810595279f07c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java @@ -80,4 +80,12 @@ public SaslAuthenticateResponseData data() { public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) { return new SaslAuthenticateResponse(new SaslAuthenticateResponseData(new ByteBufferAccessor(buffer), version)); } + + // Do not print authBytes, overwrite a temp copy of the data with empty bytes + @Override + public String toString() { + SaslAuthenticateResponseData tempData = data.duplicate(); + tempData.setAuthBytes(new byte[0]); + return tempData.toString(); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index ad52482a3b3f9..008e60d6e945b 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -305,7 +305,6 @@ import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE; import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE; import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP; -import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS; import static org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -2989,7 +2988,7 @@ private DescribeConfigsResponse createDescribeConfigsResponse(short version) { } private AlterConfigsRequest createAlterConfigsRequest(short version) { - Map configs = new HashMap<>(); + Map configs = new LinkedHashMap<>(); List configEntries = asList( new AlterConfigsRequest.ConfigEntry("config_name", "config_value"), new AlterConfigsRequest.ConfigEntry("another_name", "another value") @@ -2997,7 +2996,19 @@ private AlterConfigsRequest createAlterConfigsRequest(short version) { configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries)); configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new AlterConfigsRequest.Config(emptyList())); - return new AlterConfigsRequest.Builder(configs, false).build(version); + AlterConfigsRequest alterConfigsRequest = new AlterConfigsRequest.Builder(configs, false).build(version); + assertEquals( + "AlterConfigsRequestData(resources=[" + + "AlterConfigsResource(resourceType=" + ConfigResource.Type.BROKER.id() + ", " + + "resourceName='0', " + + "configs=[AlterableConfig(name='config_name', value='REDACTED'), " + + "AlterableConfig(name='another_name', value='REDACTED')]), " + + "AlterConfigsResource(resourceType=" + ConfigResource.Type.TOPIC.id() + ", " + + "resourceName='topic', configs=[])], " + + "validateOnly=false)", + alterConfigsRequest.toString() + ); + return alterConfigsRequest; } private AlterConfigsResponse createAlterConfigsResponse() { @@ -3100,7 +3111,12 @@ private CreateDelegationTokenResponse createCreateTokenResponse() { .setMaxTimestampMs(System.currentTimeMillis()) .setTokenId("token1") .setHmac("test".getBytes()); - return new CreateDelegationTokenResponse(data); + var response = new CreateDelegationTokenResponse(data); + + String responseStr = response.toString(); + assertTrue(responseStr.contains("tokenId='REDACTED'")); + assertTrue(responseStr.contains("hmac=[]")); + return response; } private RenewDelegationTokenRequest createRenewTokenRequest(short version) { @@ -3156,7 +3172,14 @@ private DescribeDelegationTokenResponse createDescribeTokenResponse(short versio tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes())); tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes())); - return new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList); + var response = new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList); + + String responseStr = response.toString(); + String[] parts = responseStr.split(","); + // The 2 token info should both be redacted + assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("tokenId='REDACTED'")).count()); + assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("hmac=[]")).count()); + return response; } private ElectLeadersRequest createElectLeadersRequestNullPartitions() { @@ -3773,4 +3796,26 @@ public void testInvalidTaggedFieldsWithSaslAuthenticateRequest() { parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage(); assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg); } + + @Test + public void testSaslAuthenticateRequestResponseToStringMasksSensitiveData() { + byte[] sensitiveAuthBytes = "sensitive-auth-token-123".getBytes(StandardCharsets.UTF_8); + SaslAuthenticateRequestData requestData = new SaslAuthenticateRequestData().setAuthBytes(sensitiveAuthBytes); + SaslAuthenticateRequest request = new SaslAuthenticateRequest(requestData, (short) 2); + + String requestString = request.toString(); + + // Verify that the authBytes field is present but empty in the output + assertTrue(requestString.contains("authBytes=[]"), + "authBytes field should be empty in toString() output"); + + SaslAuthenticateResponseData responseData = new SaslAuthenticateResponseData().setAuthBytes(sensitiveAuthBytes); + SaslAuthenticateResponse response = new SaslAuthenticateResponse(responseData); + + String responseString = response.toString(); + + // Verify that the authBytes field is present but empty in the output + assertTrue(responseString.contains("authBytes=[]"), + "authBytes field should be empty in toString() output"); + } } diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index ecea412e98998..dacabd1176310 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -47,6 +47,7 @@ import org.mockito.Mockito.mock import java.io.IOException import java.net.InetAddress import java.nio.ByteBuffer +import java.util import java.util.Collections import java.util.concurrent.atomic.AtomicReference import scala.collection.{Map, Seq} @@ -65,13 +66,23 @@ class RequestChannelTest { val sensitiveValue = "secret" def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = { - val alterConfigs = request(new AlterConfigsRequest.Builder( - Collections.singletonMap(resource, new Config(entries.asJavaCollection)), true).build()) + val alterConfigs = new AlterConfigsRequest.Builder( + util.Map.of(resource, new Config(entries.asJavaCollection)), true).build() - val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest] + val alterConfigsString = alterConfigs.toString + entries.foreach { entry => + if (!alterConfigsString.contains(entry.name())) { + fail("Config names should be in the request string") + } + if (entry.value() != null && alterConfigsString.contains(entry.value())) { + fail("Config values should not be in the request string") + } + } + val alterConfigsReq = request(alterConfigs) + val loggableAlterConfigs = alterConfigsReq.loggableRequest.asInstanceOf[AlterConfigsRequest] val loggedConfig = loggableAlterConfigs.configs.get(resource) assertEquals(expectedValues, toMap(loggedConfig)) - val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString + val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigsReq.header, alterConfigsReq.requestLog.toJava, alterConfigsReq.isForwarded).toString assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") }