Skip to content

Introduce LoggregatorV2 Client #1021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloudfoundry-client-reactor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

final class FilterBuilder {
public final class FilterBuilder {

private FilterBuilder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,31 @@

package org.cloudfoundry.reactor.doppler;

import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.http.HttpHeaderNames;
import org.cloudfoundry.loggregator.v2.LoggregatorEnvelope;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClientResponse;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

final class MultipartCodec {
public final class MultipartCodec {

private static final Pattern BOUNDARY_PATTERN = Pattern.compile("multipart/.+; boundary=(.*)");

private static final int MAX_PAYLOAD_SIZE = 1024 * 1024;

private static final String SERVER_SENT_EVENT_DATA_STRING = "data: ";

private MultipartCodec() {
}

Expand All @@ -47,10 +54,6 @@ static DelimiterBasedFrameDecoder createDecoder(HttpClientResponse response) {
Unpooled.copiedBuffer(String.format("\r\n--%s--\r\n", boundary), Charset.defaultCharset()));
}

static Flux<InputStream> decode(ByteBufFlux body) {
return body.asInputStream()
.skip(1);
}

private static String extractMultipartBoundary(HttpClientResponse response) {
String contentType = response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE);
Expand All @@ -63,4 +66,44 @@ private static String extractMultipartBoundary(HttpClientResponse response) {
}
}

public static DelimiterBasedFrameDecoder createSimpleDecoder(HttpClientResponse response) {
return new DelimiterBasedFrameDecoder(MAX_PAYLOAD_SIZE,
Unpooled.copiedBuffer("\n\n", Charset.defaultCharset()));
}

public static Flux<InputStream> decode(ByteBufFlux body) {
return body.asInputStream().skip(1);
}

public static Flux<LoggregatorEnvelope.Envelope> decodeAsEnvelope(ByteBufFlux body) {
return body.asString().flatMap(bodyString -> Flux.fromIterable(parseBatch(bodyString).getBatchList()));
}

private static LoggregatorEnvelope.EnvelopeBatch parseBatch(String bodyString) {
if (bodyString.startsWith("event: ")) {
return emptyBatch();
}

if (bodyString.contains("heartbeat")) {
return emptyBatch();
}

if (bodyString.startsWith(SERVER_SENT_EVENT_DATA_STRING)) {
try {
ServerSentEvent<String> serverSentEvent = ServerSentEvent.builder(bodyString.substring(SERVER_SENT_EVENT_DATA_STRING.length())).build();
LoggregatorEnvelope.EnvelopeBatch.Builder builder = LoggregatorEnvelope.EnvelopeBatch.newBuilder();
JsonFormat.parser().merge(new StringReader(serverSentEvent.data()), builder);
return builder.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return emptyBatch();
}

private static LoggregatorEnvelope.EnvelopeBatch emptyBatch() {
return LoggregatorEnvelope.EnvelopeBatch.newBuilder().build();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cloudfoundry.reactor.loggregator;

import io.netty.channel.ChannelHandler;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.cloudfoundry.reactor.client.QueryBuilder;
import org.cloudfoundry.reactor.client.v3.FilterBuilder;
import org.cloudfoundry.reactor.util.AbstractReactorOperations;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClientResponse;

import java.util.function.Function;


abstract class AbstractLoggregatorOperations extends AbstractReactorOperations {

protected AbstractLoggregatorOperations(ConnectionContext connectionContext, Mono<String> root, TokenProvider tokenProvider) {
super(connectionContext, root, tokenProvider);
}

<T> Flux<T> get(Object requestPayload, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Function<HttpClientResponse, ChannelHandler> channelHandlerBuilder,
Function<ByteBufFlux, Flux<T>> bodyTransformer) {
return createOperator().flatMapMany(operator -> operator.get()
.uri(queryTransformer(requestPayload).andThen(uriTransformer))
.response()
.addChannelHandler(channelHandlerBuilder)
.parseBodyToFlux(responseWithBody -> bodyTransformer.apply(responseWithBody.getBody())));
}

private static Function<UriComponentsBuilder, UriComponentsBuilder> queryTransformer(Object requestPayload) {
return builder -> {
FilterBuilder.augment(builder, requestPayload);
QueryBuilder.augment(builder, requestPayload);

return builder;
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cloudfoundry.reactor.loggregator;

import org.cloudfoundry.doppler.Envelope;
import org.cloudfoundry.loggregator.v2.StreamRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.cloudfoundry.reactor.doppler.MultipartCodec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorLoggregatorEndpoints extends AbstractLoggregatorOperations {

ReactorLoggregatorEndpoints(ConnectionContext connectionContext, Mono<String> root, TokenProvider tokenProvider) {
super(connectionContext, root, tokenProvider);
}

public Flux<Envelope> stream(StreamRequest request) {
return get(request, uriComponentsBuilder -> uriComponentsBuilder.pathSegment("v2/read"), MultipartCodec::createSimpleDecoder, MultipartCodec::decodeAsEnvelope)
.map(Envelope::from)
.checkpoint();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cloudfoundry.reactor.loggregator;

import org.cloudfoundry.doppler.Envelope;
import org.cloudfoundry.loggregator.v2.LoggregatorClient;
import org.cloudfoundry.loggregator.v2.StreamRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.immutables.value.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Value.Immutable
abstract class _ReactorLoggregatorClient implements LoggregatorClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires tests in line with ReactorDopplerClientTest

@Override
public Flux<Envelope> stream(StreamRequest request) {
return getLoggregatorEndpoints().stream(request);
}

@Value.Derived
ReactorLoggregatorEndpoints getLoggregatorEndpoints() {
return new ReactorLoggregatorEndpoints(getConnectionContext(), getRoot(), getTokenProvider());
}

@Value.Default
Mono<String> getRoot() {
return getConnectionContext().getRootProvider().getRoot("log_stream", getConnectionContext());
}

/**
* The connection context
*/
abstract ConnectionContext getConnectionContext();

/**
* The token provider
*/
abstract TokenProvider getTokenProvider();
}
4 changes: 4 additions & 0 deletions cloudfoundry-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.cloudfoundry.doppler;

import org.cloudfoundry.loggregator.v2.LoggregatorEnvelope;

import java.util.Objects;

/**
Expand Down Expand Up @@ -43,5 +45,15 @@ static MessageType from(org.cloudfoundry.dropsonde.events.LogMessage.MessageType
throw new IllegalArgumentException(String.format("Unknown message type: %s", dropsonde));
}
}
static MessageType from(LoggregatorEnvelope.Log.Type type) {
switch (Objects.requireNonNull(type, "log type")) {
case ERR:
return ERR;
case OUT:
return OUT;
default:
throw new IllegalArgumentException(String.format("Unknown message type: %s", type));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.cloudfoundry.doppler;

import org.cloudfoundry.Nullable;
import org.cloudfoundry.loggregator.v2.LoggregatorEnvelope;
import org.immutables.value.Value;

import java.util.Objects;
Expand All @@ -41,6 +42,30 @@ public static ContainerMetric from(org.cloudfoundry.dropsonde.events.ContainerMe
.build();
}

public static ContainerMetric from(LoggregatorEnvelope.Envelope envelope) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires tests in line with ContainerMetricTests

Objects.requireNonNull(envelope.getGauge(), "envelope");

LoggregatorEnvelope.Gauge gauge = envelope.getGauge();
return ContainerMetric.builder()
.applicationId(envelope.getSourceId())
.instanceIndex(Integer.parseInt(envelope.getInstanceId()))
.cpuPercentage(getMetricsValue(gauge, "cpu"))
.diskBytes(getLongValue(gauge, "disk"))
.diskBytesQuota(getLongValue(gauge, "disk_quota"))
.memoryBytes(getLongValue(gauge, "memory"))
.memoryBytesQuota(getLongValue(gauge, "memory_quota"))
.build();
}

private static Long getLongValue(LoggregatorEnvelope.Gauge gauge, String property) {
double metricsValue = getMetricsValue(gauge, property);
return Double.valueOf(metricsValue).longValue();
}

private static double getMetricsValue(LoggregatorEnvelope.Gauge gauge, String property) {
return gauge.getMetricsMap().getOrDefault(property, LoggregatorEnvelope.GaugeValue.newBuilder().setValue(0.0).build()).getValue();
}

/**
* The ID of the contained application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.cloudfoundry.doppler;

import org.cloudfoundry.Nullable;
import org.cloudfoundry.loggregator.v2.LoggregatorEnvelope;
import org.immutables.value.Value;

import java.util.Objects;
Expand All @@ -36,6 +37,15 @@ public static CounterEvent from(org.cloudfoundry.dropsonde.events.CounterEvent d
.total(dropsonde.total)
.build();
}
public static CounterEvent from(LoggregatorEnvelope.Counter counter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires tests in line with CounterEventTests

Objects.requireNonNull(counter, "counter");

return CounterEvent.builder()
.delta(counter.getDelta())
.name(counter.getName())
.total(counter.getTotal())
.build();
}

/**
* The amount by which to increment the counter
Expand Down
Loading