Skip to content

Commit

Permalink
[improve][broker] PIP-307: Add proxy support for Java client (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored Dec 23, 2023
1 parent 9a5c2f2 commit 32f3577
Show file tree
Hide file tree
Showing 17 changed files with 468 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
Expand Down Expand Up @@ -300,7 +301,7 @@ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
}

public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
return pulsarClient.getConnection(topic, randomKeyForSelectConnection).thenApply(Pair::getLeft);
}

public CompletableFuture<ClientCnx> getClientCnx(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -135,10 +135,10 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(i -> {
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress, false));
});
final String topicPoliciesServiceInitException
= "Topic creation encountered an exception by initialize topic policies service";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
Expand Down Expand Up @@ -270,9 +271,10 @@ public void testTransactionBufferClientTimeout() throws Exception {
CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(any(), any(), anyInt())).thenReturn(completableFuture);
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
when(mockClient.getConnection(any(), any(), anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
Expand Down Expand Up @@ -324,10 +326,9 @@ public void testTransactionBufferChannelUnActive() throws PulsarServerException
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture);
when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
Expand Down Expand Up @@ -57,9 +58,9 @@ public void testRequestCredits() throws PulsarServerException {
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
Optional<NamespaceEphemeralData> opData = Optional.empty();
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
when(pulsarClient.getConnection(anyString(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(Pair.of(mock(ClientCnx.class), false)));
when(pulsarClient.getConnection(anyString()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,21 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
TopicName topicName = TopicName.get("persistent://public/default/test");

// test request 1
{
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey(), brokerAddress);
Assert.assertEquals(result.getValue(), brokerAddress);
var result = lookupService.getBroker(topicName).get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
Assert.assertEquals(result.isUseProxy(), false);
}
// test request 2
{
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey(), brokerAddress);
Assert.assertEquals(result.getValue(), brokerAddress);
var result = lookupService.getBroker(topicName).get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
Assert.assertEquals(result.isUseProxy(), false);
}
}

Expand Down Expand Up @@ -187,12 +187,11 @@ public void testHttpLookupRedirect() throws Exception {
doReturn(CompletableFuture.completedFuture(optional), CompletableFuture.completedFuture(optional2))
.when(namespaceService).getBrokerServiceUrlAsync(any(), any());

CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));

Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey(), address);
Assert.assertEquals(result.getValue(), address);
var result =
lookupService.getBroker(TopicName.get("persistent://public/default/test")).get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getLogicalAddress(), address);
Assert.assertEquals(result.getPhysicalAddress(), address);
Assert.assertEquals(result.isUseProxy(), false);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public CompletableFuture<ClientCnx> getConnection(String topic) {
result.completeExceptionally(new IOException("New connections are rejected."));
return result;
} else {
return super.getConnection(topic, getCnxPool().genRandomKeyToSelectCon());
return super.getConnection(topic);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
Expand All @@ -58,7 +57,7 @@ public class BinaryProtoLookupService implements LookupService {
private final String listenerName;
private final int maxLookupRedirects;

private final ConcurrentHashMap<TopicName, CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
private final ConcurrentHashMap<TopicName, CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();

private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
Expand Down Expand Up @@ -99,11 +98,11 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
* topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> newFuture =
CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
newFutureCreated.setValue(newFuture);
return newFuture;
Expand Down Expand Up @@ -139,9 +138,9 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(T
}
}

private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socketAddress,
boolean authoritative, TopicName topicName, final int redirectCount) {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
CompletableFuture<LookupTopicResult> addressFuture = new CompletableFuture<>();

if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
addressFuture.completeExceptionally(
Expand All @@ -159,7 +158,6 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
if (log.isDebugEnabled()) {
log.debug("[{}] Lookup response exception: {}", topicName, t);
}

addressFuture.completeExceptionally(t);
} else {
URI uri = null;
Expand Down Expand Up @@ -198,10 +196,12 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
// (3) received correct broker to connect
if (r.proxyThroughServiceUrl) {
// Connect through proxy
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
addressFuture.complete(
new LookupTopicResult(responseBrokerAddress, socketAddress, true));
} else {
// Normal result with direct connection to broker
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
addressFuture.complete(
new LookupTopicResult(responseBrokerAddress, responseBrokerAddress, false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class ConnectionHandler {
private final AtomicBoolean duringConnect = new AtomicBoolean(false);
protected final int randomKeyForSelectConnection;

private volatile Boolean useProxy;

interface Connection {

/**
Expand Down Expand Up @@ -93,11 +95,14 @@ protected void grabCnx(Optional<URI> hostURI) {

try {
CompletableFuture<ClientCnx> cnxFuture;
if (hostURI.isPresent()) {
InetSocketAddress address = InetSocketAddress.createUnresolved(
hostURI.get().getHost(),
hostURI.get().getPort());
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
if (hostURI.isPresent() && useProxy != null) {
URI uri = hostURI.get();
InetSocketAddress address = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
if (useProxy) {
cnxFuture = state.client.getProxyConnection(address, randomKeyForSelectConnection);
} else {
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
}
} else if (state.redirectedClusterURI != null) {
if (state.topic == null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
Expand All @@ -112,7 +117,11 @@ protected void grabCnx(Optional<URI> hostURI) {
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection);
cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection).thenApply(
connectionResult -> {
useProxy = connectionResult.getRight();
return connectionResult.getLeft();
});
}
cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.client.api.SchemaSerializationException;
Expand Down Expand Up @@ -81,7 +80,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
*/
@Override
@SuppressWarnings("deprecation")
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
Expand All @@ -101,7 +100,8 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
}

InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
false /* HTTP lookups never use the proxy */));
} catch (Exception e) {
// Failed to parse url
log.warn("[{}] Lookup Failed due to invalid url {}, {}", topicName, uri, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
Expand Down Expand Up @@ -54,9 +53,10 @@ public interface LookupService extends AutoCloseable {
*
* @param topicName
* topic-name
* @return a pair of addresses, representing the logical and physical address of the broker that serves given topic
* @return a {@link LookupTopicResult} representing the logical and physical address of the broker that serves the
* given topic, as well as proxying information.
*/
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName);
CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);

/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@AllArgsConstructor
@ToString
public class LookupTopicResult {
private final InetSocketAddress logicalAddress;
private final InetSocketAddress physicalAddress;
private final boolean isUseProxy;
}
Loading

0 comments on commit 32f3577

Please sign in to comment.