Skip to content

Commit

Permalink
Update user lifecycle tracking to V3
Browse files Browse the repository at this point in the history
  • Loading branch information
manuel-alvarez-alvarez committed Dec 18, 2024
1 parent a19f73a commit b48fedf
Show file tree
Hide file tree
Showing 20 changed files with 912 additions and 639 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.datadog.appsec.event.ReplaceableEventProducerService;
import com.datadog.appsec.gateway.GatewayBridge;
import com.datadog.appsec.powerwaf.PowerWAFModule;
import com.datadog.appsec.user.AppSecEventTrackerImpl;
import com.datadog.appsec.util.AbortStartupException;
import com.datadog.appsec.util.StandardizedLogging;
import datadog.appsec.api.blocking.Blocking;
Expand Down Expand Up @@ -99,7 +98,7 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s

Blocking.setBlockingService(new BlockingServiceImpl(REPLACEABLE_EVENT_PRODUCER));

AppSecEventTracker.setEventTracker(new AppSecEventTrackerImpl());
AppSecEventTracker.setEventTracker(new AppSecEventTracker());

STARTED.set(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public interface KnownAddresses {

Address<String> USER_ID = new Address<>("usr.id");

Address<String> USER_LOGIN = new Address<>("usr.login");

Address<String> SESSION_ID = new Address<>("usr.session_id");

/** The URL of a network resource being requested (outgoing request) */
Expand Down Expand Up @@ -189,6 +191,8 @@ static Address<?> forName(String name) {
return SERVER_GRAPHQL_ALL_RESOLVERS;
case "usr.id":
return USER_ID;
case "usr.login":
return USER_LOGIN;
case "usr.session_id":
return SESSION_ID;
case "server.io.net.url":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datadog.appsec.report.AppSecEvent;
import com.datadog.appsec.util.StandardizedLogging;
import datadog.trace.api.Config;
import datadog.trace.api.UserIdCollectionMode;
import datadog.trace.api.http.StoredBodySupplier;
import datadog.trace.api.internal.TraceSegment;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand Down Expand Up @@ -125,6 +126,10 @@ public class AppSecRequestContext implements DataBundle, Closeable {

// keep a reference to the last published usr.id
private volatile String userId;
private volatile UserIdCollectionMode userIdSource;
// keep a reference to the last published usr.login
private volatile String userLogin;
private volatile UserIdCollectionMode userLoginSource;
// keep a reference to the last published usr.session_id
private volatile String sessionId;

Expand Down Expand Up @@ -435,6 +440,30 @@ public void setUserId(String userId) {
this.userId = userId;
}

public UserIdCollectionMode getUserIdSource() {
return userIdSource;
}

public void setUserIdSource(UserIdCollectionMode userIdSource) {
this.userIdSource = userIdSource;
}

public String getUserLogin() {
return userLogin;
}

public void setUserLogin(String userLogin) {
this.userLogin = userLogin;
}

public UserIdCollectionMode getUserLoginSource() {
return userLoginSource;
}

public void setUserLoginSource(UserIdCollectionMode userLoginSource) {
this.userLoginSource = userLoginSource;
}

public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package com.datadog.appsec.gateway;

import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_0_2;
import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_3_4;
import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_6_10;
import static com.datadog.appsec.gateway.AppSecRequestContext.DEFAULT_REQUEST_HEADERS_ALLOW_LIST;
import static com.datadog.appsec.gateway.AppSecRequestContext.REQUEST_HEADERS_ALLOW_LIST;
import static com.datadog.appsec.gateway.AppSecRequestContext.RESPONSE_HEADERS_ALLOW_LIST;
import static datadog.trace.api.UserIdCollectionMode.ANONYMIZATION;
import static datadog.trace.api.UserIdCollectionMode.DISABLED;
import static datadog.trace.api.UserIdCollectionMode.SDK;
import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY;
import static datadog.trace.util.Strings.toHexString;

import com.datadog.appsec.AppSecSystem;
import com.datadog.appsec.api.security.ApiSecurityRequestSampler;
Expand All @@ -22,7 +28,6 @@
import com.datadog.appsec.report.AppSecEventWrapper;
import datadog.trace.api.Config;
import datadog.trace.api.UserIdCollectionMode;
import datadog.trace.api.function.TriFunction;
import datadog.trace.api.gateway.Events;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.gateway.IGSpanInfo;
Expand All @@ -41,6 +46,8 @@
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -51,7 +58,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,6 +74,10 @@ public class GatewayBridge {
private static final Pattern QUERY_PARAM_SPLITTER = Pattern.compile("&");
private static final Map<String, List<String>> EMPTY_QUERY_PARAMS = Collections.emptyMap();

private static final int HASH_SIZE_BYTES = 16; // 128 bits
private static final String ANON_PREFIX = "anon_";
private static final AtomicBoolean SHA_MISSING_REPORTED = new AtomicBoolean(false);

/** User tracking tags that will force the collection of request headers */
private static final String[] USER_TRACKING_TAGS = {
"appsec.events.users.login.success.track", "appsec.events.users.login.failure.track"
Expand All @@ -91,7 +104,8 @@ public class GatewayBridge {
private volatile DataSubscriberInfo ioNetUrlSubInfo;
private volatile DataSubscriberInfo ioFileSubInfo;
private volatile DataSubscriberInfo sessionIdSubInfo;
private final ConcurrentHashMap<Address<String>, DataSubscriberInfo> userIdSubInfo =
private volatile DataSubscriberInfo userIdSubInfo;
private final ConcurrentHashMap<String, DataSubscriberInfo> loginEventSubInfo =
new ConcurrentHashMap<>();

public GatewayBridge(
Expand Down Expand Up @@ -134,11 +148,8 @@ public void init() {
subscriptionService.registerCallback(EVENTS.networkConnection(), this::onNetworkConnection);
subscriptionService.registerCallback(EVENTS.fileLoaded(), this::onFileLoaded);
subscriptionService.registerCallback(EVENTS.requestSession(), this::onRequestSession);
subscriptionService.registerCallback(EVENTS.userId(), this.onUserEvent(KnownAddresses.USER_ID));
subscriptionService.registerCallback(
EVENTS.loginSuccess(), this.onUserEvent(KnownAddresses.LOGIN_SUCCESS));
subscriptionService.registerCallback(
EVENTS.loginFailure(), this.onUserEvent(KnownAddresses.LOGIN_FAILURE));
subscriptionService.registerCallback(EVENTS.user(), this::onUser);
subscriptionService.registerCallback(EVENTS.loginEvent(), this::onLoginEvent);

if (additionalIGEvents.contains(EVENTS.requestPathParams())) {
subscriptionService.registerCallback(EVENTS.requestPathParams(), this::onRequestPathParams);
Expand All @@ -149,55 +160,157 @@ public void init() {
}
}

private TriFunction<RequestContext, UserIdCollectionMode, String, Flow<Void>> onUserEvent(
final Address<String> address) {
return (ctx_, mode, userId) -> {
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (userId == null || ctx == null) {
private Flow<Void> onUser(
final RequestContext ctx_, final UserIdCollectionMode mode, final String originalUser) {
if (mode == DISABLED) {
return NoopFlow.INSTANCE;
}
final String user = anonymizeUser(mode, originalUser);
if (user == null) {
return NoopFlow.INSTANCE;
}
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return NoopFlow.INSTANCE;
}
final TraceSegment segment = ctx_.getTraceSegment();

// span with ASM data
segment.setTagTop(Tags.ASM_KEEP, true);
segment.setTagTop(Tags.PROPAGATED_APPSEC, true);

// skip event if we have an SDK one
if (mode != SDK) {
segment.setTagTop("_dd.appsec.usr.id", user);
if (ctx.getUserIdSource() == SDK) {
return NoopFlow.INSTANCE;
}
final TraceSegment segment = ctx_.getTraceSegment();
// user id can be set by the SDK overriding the auto event, always update the segment
segment.setTagTop("usr.id", userId);
segment.setTagTop("_dd.appsec.user.collection_mode", mode.shortName());
final List<Address<?>> addresses = new ArrayList<>(2);
final boolean newUserId = !userId.equals(ctx.getUserId());
if (newUserId) {
// unlikely that multiple threads will update the value at the same time
ctx.setUserId(userId);
addresses.add(KnownAddresses.USER_ID);
}
if (address != KnownAddresses.USER_ID) {
addresses.add(address);
}
if (addresses.isEmpty()) {
// nothing to publish so short-circuit here
}

// update span tags
segment.setTagTop("usr.id", user);
segment.setTagTop("_dd.appsec.user.collection_mode", mode.fullName());

// update current context with new user id
ctx.setUserIdSource(mode);
final boolean newUserId = !user.equals(ctx.getUserId());
if (!newUserId) {
return NoopFlow.INSTANCE;
}
ctx.setUserId(user);

// call waf if we have a new user id
while (true) {
DataSubscriberInfo subInfo = userIdSubInfo;
if (subInfo == null) {
subInfo = producerService.getDataSubscribers(KnownAddresses.USER_ID);
userIdSubInfo = subInfo;
}
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
}
final Address<?>[] addressArray = addresses.toArray(new Address[0]);
while (true) {
DataSubscriberInfo subInfo =
userIdSubInfo.computeIfAbsent(
address, k -> producerService.getDataSubscribers(addressArray));
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
}
MapDataBundle.Builder bundle = new MapDataBundle.Builder(CAPACITY_0_2);
if (newUserId) {
bundle.add(KnownAddresses.USER_ID, userId);
}
if (address != KnownAddresses.USER_ID) {
// we don't support null values for the address so we use an invalid placeholder here
bundle.add(address, "invalid");
}
try {
GatewayContext gwCtx = new GatewayContext(false);
return producerService.publishDataEvent(subInfo, ctx, bundle.build(), gwCtx);
} catch (ExpiredSubscriberInfoException e) {
userIdSubInfo.remove(address);
}
DataBundle bundle =
new MapDataBundle.Builder(CAPACITY_0_2).add(KnownAddresses.USER_ID, user).build();
try {
GatewayContext gwCtx = new GatewayContext(false);
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
} catch (ExpiredSubscriberInfoException e) {
userIdSubInfo = null;
}
}
}

private Flow<Void> onLoginEvent(
final RequestContext ctx_,
final UserIdCollectionMode mode,
final String eventName,
final Boolean exists,
final String originalUser,
final Map<String, String> metadata) {
if (mode == DISABLED) {
return NoopFlow.INSTANCE;
}
final String user = anonymizeUser(mode, originalUser);
if (user == null) {
return NoopFlow.INSTANCE;
}
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return NoopFlow.INSTANCE;
}
final TraceSegment segment = ctx_.getTraceSegment();

// span with ASM data
segment.setTagTop(Tags.ASM_KEEP, true);
segment.setTagTop(Tags.PROPAGATED_APPSEC, true);

// skip event if we have an SDK one
if (mode != SDK) {
segment.setTagTop("_dd.appsec.usr.login", user);
segment.setTagTop("_dd.appsec.usr.id", user);
segment.setTagTop(
"_dd.appsec.events.users." + eventName + ".auto.mode", mode.fullName(), true);
if (ctx.getUserLoginSource() == SDK) {
return NoopFlow.INSTANCE;
}
} else {
segment.setTagTop("_dd.appsec.events.users." + eventName + ".sdk", true, true);
}

// update span tags
segment.setTagTop("appsec.events.users." + eventName + ".usr.login", user, true);
segment.setTagTop("appsec.events.users." + eventName + ".usr.id", user, true);
segment.setTagTop("appsec.events.users." + eventName + ".track", true, true);
if (exists != null) {
segment.setTagTop("appsec.events.users." + eventName + ".usr.exists", exists, true);
}
if (metadata != null && !metadata.isEmpty()) {
segment.setTagTop("appsec.events.users." + eventName, metadata, true);
}

// update current context with new user login
ctx.setUserLoginSource(mode);
final boolean newUserLogin = !user.equals(ctx.getUserLogin());
if (!newUserLogin) {
return NoopFlow.INSTANCE;
}
ctx.setUserLogin(user);

// call waf if we have a new user login
final List<Address<?>> addresses = new ArrayList<>(3);
addresses.add(KnownAddresses.USER_LOGIN);
addresses.add(KnownAddresses.USER_ID);
if (KnownAddresses.LOGIN_SUCCESS.getKey().endsWith(eventName)) {
addresses.add(KnownAddresses.LOGIN_SUCCESS);
} else if (KnownAddresses.LOGIN_FAILURE.getKey().endsWith(eventName)) {
addresses.add(KnownAddresses.LOGIN_FAILURE);
}
final MapDataBundle.Builder bundleBuilder =
new MapDataBundle.Builder(addresses.size() == 2 ? CAPACITY_0_2 : CAPACITY_3_4);
bundleBuilder.add(KnownAddresses.USER_ID, user);
bundleBuilder.add(KnownAddresses.USER_LOGIN, user);
if (addresses.size() == 3) {
// we don't support null values for the address so we use an invalid placeholder here
bundleBuilder.add(addresses.get(2), "invalid");
}
final DataBundle bundle = bundleBuilder.build();
final String subInfoKey =
addresses.stream().map(Address::getKey).collect(Collectors.joining("|"));
while (true) {
DataSubscriberInfo subInfo =
loginEventSubInfo.computeIfAbsent(
subInfoKey,
t -> producerService.getDataSubscribers(addresses.toArray(new Address[0])));
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
}
try {
GatewayContext gwCtx = new GatewayContext(false);
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
} catch (ExpiredSubscriberInfoException e) {
loginEventSubInfo.remove(subInfoKey);
}
};
}
}

private Flow<Void> onRequestSession(final RequestContext ctx_, final String sessionId) {
Expand Down Expand Up @@ -940,6 +1053,33 @@ private static int byteToDigit(byte b) {
return -1;
}

protected static String anonymizeUser(final UserIdCollectionMode mode, final String userId) {
if (mode != ANONYMIZATION || userId == null) {
return userId;
}
MessageDigest digest;
try {
// TODO avoid lookup a new instance every time
digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
if (!SHA_MISSING_REPORTED.getAndSet(true)) {
log.error(
SEND_TELEMETRY,
"Missing SHA-256 digest, user collection in 'anon' mode cannot continue",
e);
}
return null;
}
digest.update(userId.getBytes());
byte[] hash = digest.digest();
if (hash.length > HASH_SIZE_BYTES) {
byte[] temp = new byte[HASH_SIZE_BYTES];
System.arraycopy(hash, 0, temp, 0, temp.length);
hash = temp;
}
return ANON_PREFIX + toHexString(hash);
}

private static class IGAppSecEventDependencies {

private static final Map<Address<?>, Collection<datadog.trace.api.gateway.EventType<?>>>
Expand Down
Loading

0 comments on commit b48fedf

Please sign in to comment.