Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added entities for webhook events and webhook sends #92

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,19 @@ The webhook object has this format:

For creating and updating of webhooks, `id`, `createdBy` and `createdAt` are ignored. `secret` is not sent when fetching webhooks.

#### Storing webhook events and sends

This extension contains the functionality to store and retrieve the payload that was sent to a webhook, as well as the sending status. In order to enable this functionality, you must set the SPI config variable `--spi-events-listener-ext-event-webhook-store-webhook-events=true` and ensure that your realm settings have events and admin events enabled, which causes them to be stored using the configured `EventStoreProvider`.

This also enables a few additional custom REST endpoints for querying information about the payload and status of webhook sends.

| Path | Method | Payload | Returns | Description |
| ---------------------------------- | -------- | -------------- | ----------------------- | -------------- |
| `/auth/realms/:realm/webhooks/:id/sends` | `GET` | `first`, `max` query params for pagination | Webhook send objects (brief) | Get webhook sends |
| `/auth/realms/:realm/webhooks/:id/sends/:sid` | `GET` | | Webhook send object (with payload) | Get a webhook send |
| `/auth/realms/:realm/webhooks/:id/sends/:sid/resend` | `POST` | | `202` | Resend a webhook payload |


##### Example

To create a webhook for all events on the `master` realm:
Expand Down
13 changes: 8 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.java.package>io.phasetwo.keycloak.events</main.java.package>
<junit.version>5.11.2</junit.version>
<keycloak.version>26.0.2</keycloak.version>
<keycloak.version>26.1.0</keycloak.version>
<keycloak-admin-client.version>26.0.1</keycloak-admin-client.version>
<lombok.version>1.18.34</lombok.version>
<auto-service.version>1.1.1</auto-service.version>
Expand Down Expand Up @@ -117,8 +117,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<version>3.5.2</version>
<configuration>
<classpathDependencyExcludes>
<classpathDependencyExclude>org.jboss.slf4j:slf4j-jboss-logmanager</classpathDependencyExclude>
</classpathDependencyExcludes>
<systemPropertyVariables>
<keycloak-version>${keycloak.version}</keycloak-version>
</systemPropertyVariables>
Expand Down Expand Up @@ -232,13 +235,13 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.dasniko</groupId>
<artifactId>testcontainers-keycloak</artifactId>
<version>3.5.1</version>
<version>3.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected void send(
LegacySimpleHttp.Response response = request.asResponse();
int status = response.getStatus();
log.debugf("sent to %s (%d)", targetUri, status);
doAfterSend(task, status);
if (status < HTTP_OK || status >= HTTP_MULT_CHOICE) { // any 2xx is acceptable
log.warnf("Sending failure (Server response:%d)", status);
throw new SenderException(true);
Expand All @@ -94,6 +95,16 @@ protected void send(
}
}

protected final void doAfterSend(SenderTask task, int httpStatus) {
try {
afterSend(task, httpStatus);
} catch (Exception e) {
log.warn("Error afterSend", e);
}
}

protected void afterSend(SenderTask task, int httpStatus) {}

protected String hmacFor(Object o, String sharedSecret, String algorithm) {
try {
String data = JsonSerialization.writeValueAsString(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public void onEvent(AdminEvent event, boolean b) {

@Override
public void close() {
// close this instance of the event listener
log.debugf("called close() on SenderEventListenerProvider");
log.tracef("called close() on SenderEventListenerProvider");
}

class SenderTask {
Expand Down Expand Up @@ -110,14 +109,14 @@ protected void schedule(SenderTask task, long delay, TimeUnit unit) {
try {
send(task);
} catch (SenderException | IOException e) {
log.debug("sending exception", e);
log.trace("sending exception", e);
if (e instanceof SenderException && !((SenderException) e).isRetryable()) return;
log.debugf(
log.tracef(
"BackOff policy is %s",
BackOff.STOP_BACKOFF == task.getBackOff() ? "STOP" : "BACKOFF");
long backOffTime = task.getBackOff().nextBackOffMillis();
if (backOffTime == BackOff.STOP) return;
log.debugf("retrying in %d due to %s", backOffTime, e.getCause());
log.tracef("retrying in %d due to %s", backOffTime, e.getCause());
schedule(task, backOffTime, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.warn("Uncaught Sender error", t);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.phasetwo.keycloak.events;

import com.google.common.base.Strings;
import io.phasetwo.keycloak.model.KeycloakEventType;
import io.phasetwo.keycloak.model.WebhookEventModel;
import io.phasetwo.keycloak.model.WebhookModel;
import io.phasetwo.keycloak.model.WebhookProvider;
import io.phasetwo.keycloak.model.WebhookSendModel;
import io.phasetwo.keycloak.representation.ExtendedAdminEvent;
import io.phasetwo.keycloak.representation.ExtendedAuthDetails;
import java.io.IOException;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,12 +35,15 @@ public class WebhookSenderEventListenerProvider extends HttpSenderEventListenerP
private final RunnableTransaction runnableTrx;
private final KeycloakSessionFactory factory;

private final boolean storeWebhookEvents;
private final WebhookProvider webhooks;

private final String systemUri;
private final String systemSecret;
private final String systemAlgorithm;

public WebhookSenderEventListenerProvider(
KeycloakSession session, ScheduledExecutorService exec) {
KeycloakSession session, ScheduledExecutorService exec, boolean storeWebhookEvents) {
super(session, exec);
this.factory = session.getKeycloakSessionFactory();
this.runnableTrx = new RunnableTransaction();
Expand All @@ -45,14 +52,18 @@ public WebhookSenderEventListenerProvider(
this.systemUri = System.getenv(WEBHOOK_URI_ENV);
this.systemSecret = System.getenv(WEBHOOK_SECRET_ENV);
this.systemAlgorithm = System.getenv(WEBHOOK_ALGORITHM_ENV);
// should we store webhook events and sends?
this.storeWebhookEvents = storeWebhookEvents;
this.webhooks = session.getProvider(WebhookProvider.class);
}

@Override
public void onEvent(Event event) {
log.debugf("onEvent %s %s", event.getType(), event.getId());
try {
ExtendedAdminEvent customEvent = completeAdminEventAttributes("", event);
runnableTrx.addRunnable(() -> processEvent(customEvent, event.getRealmId()));
runnableTrx.addRunnable(
() -> processEvent(KeycloakEventType.USER, customEvent, event.getRealmId()));
} catch (Exception e) {
log.warn("Error converting and scheduling event: " + event, e);
}
Expand All @@ -67,27 +78,75 @@ public void onEvent(AdminEvent adminEvent, boolean b) {
adminEvent.getResourcePath());
try {
ExtendedAdminEvent customEvent = completeAdminEventAttributes("", adminEvent);
runnableTrx.addRunnable(() -> processEvent(customEvent, adminEvent.getRealmId()));
runnableTrx.addRunnable(
() -> processEvent(KeycloakEventType.ADMIN, customEvent, adminEvent.getRealmId()));
} catch (Exception e) {
log.warn("Error converting and scheduling event: " + adminEvent, e);
}
}

private void storeEvent(
KeycloakSession session, KeycloakEventType type, ExtendedAdminEvent event) {
if (!storeWebhookEvents) {
log.infof("storeWebhookEvents is %s. skipping...", storeWebhookEvents);
return;
}
RealmModel realm = session.realms().getRealm(event.getRealmId());
if (type == KeycloakEventType.USER && !realm.isEventsEnabled()) {
log.infof("USER events disabled for realm %s", realm.getName());
return;
}
if (type == KeycloakEventType.ADMIN && !realm.isAdminEventsEnabled()) {
log.infof("ADMIN events disabled for realm %s", realm.getName());
return;
}
if (!type.keycloakNative()) {
log.infof("%S event. Skipping event storage.", type);
return;
}

WebhookEventModel we =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this logic is persisiting access.CODE_TO_TOKEN and access.REFESH_TOKEN events. Do we want this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure what you mean. If the realm has admin events disabled and user events enabled, then anything with the access. prefix should get persisted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I misunderstood. i thought we need to send only system specific like USER_UPDATE, CREATE_GROUP . I thought seending authorization events such as a REFRESH_TOKEN is a bit of an overhead.

webhooks.storeEvent(
session.realms().getRealm(event.getRealmId()), type, event.getId(), event);
log.infof(
"Webhook event stored [%s] %s, %s, %s, %s",
we.getId(),
we.getRealm().getName(),
we.getEventType(),
we.getEventId(),
we.getAdminEventId());
}

public void processEvent(ExtendedAdminEvent event, String realmId) {
processEvent(KeycloakEventType.fromTypeString(event.getType()), event, realmId);
}

/** Update the event with a unique uid */
public void processEvent(ExtendedAdminEvent customEvent, String realmId) {
public void processEvent(KeycloakEventType type, ExtendedAdminEvent event, String realmId) {
processEvent(
() -> {
customEvent.setUid(KeycloakModelUtils.generateId());
return customEvent;
if (event.getUid() == null) {
event.setUid(KeycloakModelUtils.generateId());
}
return event;
},
realmId);
realmId,
type,
event);
}

/** Schedule dispatch to all webhooks and system */
private void processEvent(Supplier<ExtendedAdminEvent> supplier, String realmId) {
private void processEvent(
Supplier<ExtendedAdminEvent> supplier,
String realmId,
KeycloakEventType type,
ExtendedAdminEvent event) {
KeycloakModelUtils.runJobInTransaction(
factory,
(session) -> {
if (type.keycloakNative()) {
storeEvent(session, type, event);
}
RealmModel realm = session.realms().getRealm(realmId);
WebhookProvider webhooks = session.getProvider(WebhookProvider.class);
webhooks
Expand All @@ -98,18 +157,70 @@ private void processEvent(Supplier<ExtendedAdminEvent> supplier, String realmId)
w -> {
ExtendedAdminEvent customEvent = supplier.get();
if (!enabledFor(w, customEvent)) return;
schedule(customEvent, w.getUrl(), w.getSecret(), w.getAlgorithm());
schedule(w, customEvent);
});
// for system owner catch-all
if (!Strings.isNullOrEmpty(systemUri)) {
schedule(supplier.get(), systemUri, systemSecret, systemAlgorithm);
schedule(null, supplier.get(), systemUri, systemSecret, systemAlgorithm);
}
});
}

@Override
protected void afterSend(final SenderTask task, final int httpStatus) {
if (task.getProperties().get("webhookId") == null) return;
final ExtendedAdminEvent customEvent = (ExtendedAdminEvent) task.getEvent();
if (!KeycloakEventType.fromTypeString(customEvent.getType()).keycloakNative()) {
log.infof("%s event type. Skipping send storage.", customEvent.getType());
return;
}
KeycloakModelUtils.runJobInTransaction(
factory,
(session) -> {
RealmModel realm = session.realms().getRealm(customEvent.getRealmId());
WebhookProvider webhooks = session.getProvider(WebhookProvider.class);
WebhookModel webhook =
webhooks.getWebhookById(realm, task.getProperties().get("webhookId"));
WebhookEventModel event =
webhooks.getEvent(
realm,
KeycloakEventType.fromTypeString(customEvent.getType()),
customEvent.getId());
if (event == null) {
log.infof(
"No event for [%s] %s. Skipping send storage.",
customEvent.getType(), customEvent.getId());
} else {
// look it up first, as we might be here for a retry/resend
WebhookSendModel webhookSend = webhooks.getSendById(realm, customEvent.getUid());
if (webhookSend == null) {
webhookSend =
webhooks.storeSend(webhook, event, customEvent.getUid(), customEvent.getType());
}
webhookSend.setStatus(httpStatus);
webhookSend.incrementRetries();
webhookSend.setSentAt(new Date());
}
});
}

public void schedule(WebhookModel webhook, ExtendedAdminEvent customEvent) {
schedule(
webhook.getId(),
customEvent,
webhook.getUrl(),
webhook.getSecret(),
webhook.getAlgorithm());
}

private void schedule(
ExtendedAdminEvent customEvent, String url, String secret, String algorithm) {
String webhookId,
ExtendedAdminEvent customEvent,
String url,
String secret,
String algorithm) {
SenderTask task = new SenderTask(customEvent, getBackOff());
task.getProperties().put("webhookId", webhookId);
task.getProperties().put("url", url);
task.getProperties().put("secret", secret);
task.getProperties().put("algorithm", algorithm);
Expand Down Expand Up @@ -198,13 +309,13 @@ private ExtendedAdminEvent completeExtendedAuthDetails(ExtendedAdminEvent event)
details.setSessionId(
session.getContext().getAuthenticationSession().getParentSession().getId());
} catch (Exception e) {
log.debug("couldn't get sessionId", e);
log.tracef("couldn't get sessionId: %s", e.getMessage());
}
try {
details.setRealmId(
session.getContext().getAuthenticationSession().getParentSession().getRealm().getName());
} catch (Exception e) {
log.debug("couldn't get realmId", e);
log.tracef("couldn't get realmId: %s", e.getMessage());
}
return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class WebhookSenderEventListenerProviderFactory
public static final String PROVIDER_ID = "ext-event-webhook";

private ScheduledExecutorService exec;
private boolean storeWebhookEvents = false;

@Override
public String getId() {
Expand All @@ -25,11 +26,14 @@ public String getId() {

@Override
public WebhookSenderEventListenerProvider create(KeycloakSession session) {
return new WebhookSenderEventListenerProvider(session, exec);
return new WebhookSenderEventListenerProvider(session, exec, storeWebhookEvents);
}

@Override
public void init(Config.Scope scope) {
storeWebhookEvents = scope.getBoolean("storeWebhookEvents", false);
log.infof("storeWebhookEvents %b", storeWebhookEvents);

exec =
MoreExecutors.getExitingScheduledExecutorService(
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors()));
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/io/phasetwo/keycloak/model/KeycloakEventType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.phasetwo.keycloak.model;

public enum KeycloakEventType {
USER,
ADMIN,
SYSTEM,
UNKNOWN;

public boolean keycloakNative() {
return (this == USER || this == ADMIN);
}

static KeycloakEventType from(String input) {
try {
return KeycloakEventType.valueOf(KeycloakEventType.class, input.toUpperCase());
} catch (Exception e) {
return KeycloakEventType.UNKNOWN;
}
}

public static KeycloakEventType fromTypeString(String input) {
if (input == null || input.isEmpty()) {
return KeycloakEventType.UNKNOWN;
}

for (KeycloakEventType type : KeycloakEventType.values()) {
if (input.toUpperCase().startsWith(type.name() + ".")) {
return type;
}
}
return KeycloakEventType.UNKNOWN;
}
}
Loading