Skip to content

Make EnterpriseGeoIpDownloaderLicenseListener project aware #129992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ private void sendUpdateStateRequest(
/**
* Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter
*/
@Deprecated(forRemoval = true) // Use the explict cluster/project version instead
public void sendRemoveRequest(final String taskId, final TimeValue timeout, final ActionListener<PersistentTask<?>> listener) {
sendRemoveRequest(null, taskId, timeout, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public Collection<?> createComponents(PluginServices services) {
services.client(),
services.clusterService(),
services.threadPool(),
getLicenseState()
getLicenseState(),
services.projectResolver()
);
enterpriseGeoIpDownloaderLicenseListener.init();
return List.of(enterpriseGeoIpDownloaderLicenseListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseStateListener;
Expand All @@ -31,12 +33,14 @@
import org.elasticsearch.xpack.core.XPackField;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;

public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloaderLicenseListener.class);
// Note: This custom type is GeoIpMetadata.TYPE, but that class is not exposed to this plugin
// Note: This custom type is IngestGeoIpMetadata.TYPE, but that class is not exposed to this plugin
static final String INGEST_GEOIP_CUSTOM_METADATA_TYPE = "ingest_geoip";

private final PersistentTasksService persistentTasksService;
Expand All @@ -47,18 +51,21 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
XPackField.ENTERPRISE_GEOIP_DOWNLOADER,
License.OperationMode.PLATINUM
);
private volatile boolean licenseIsValid = false;
private volatile boolean hasIngestGeoIpMetadata = false;
private final ConcurrentMap<ProjectId, Boolean> licenseIsValid = new ConcurrentHashMap<>();
private final ConcurrentMap<ProjectId, Boolean> hasIngestGeoIpMetadata = new ConcurrentHashMap<>();
private final ProjectResolver projectResolver;

protected EnterpriseGeoIpDownloaderLicenseListener(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
XPackLicenseState licenseState
XPackLicenseState licenseState,
ProjectResolver projectResolver
) {
this.persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
this.clusterService = clusterService;
this.licenseState = licenseState;
this.projectResolver = projectResolver;
}

private volatile boolean licenseStateListenerRegistered;
Expand All @@ -74,47 +81,55 @@ void listenForLicenseStateChanges() {
licenseState.addListener(this);
}

@NotMultiProjectCapable(description = "Replace DEFAULT project after enterprise license is supported in serverless and project-aware")
@Override
public void licenseStateChanged() {
licenseIsValid = ENTERPRISE_GEOIP_FEATURE.checkWithoutTracking(licenseState);
maybeUpdateTaskState(clusterService.state());
licenseIsValid.put(ProjectId.DEFAULT, ENTERPRISE_GEOIP_FEATURE.checkWithoutTracking(licenseState));
final boolean isLocalNodeMaster = clusterService.state().nodes().isLocalNodeElectedMaster();
maybeUpdateTaskState(ProjectId.DEFAULT, isLocalNodeMaster);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
hasIngestGeoIpMetadata = event.state().metadata().getProject().custom(INGEST_GEOIP_CUSTOM_METADATA_TYPE) != null;
final boolean ingestGeoIpCustomMetaChangedInEvent = event.metadataChanged()
&& event.changedCustomProjectMetadataSet().contains(INGEST_GEOIP_CUSTOM_METADATA_TYPE);
final boolean masterNodeChanged = Objects.equals(
event.state().nodes().getMasterNode(),
event.previousState().nodes().getMasterNode()
) == false;
/*
* We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState if this cluster change
* event involved the modification of custom geoip metadata OR a master node change
*/
if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata)) {
maybeUpdateTaskState(event.state());
}
final boolean isLocalNodeMaster = event.state().nodes().isLocalNodeElectedMaster();
event.state().metadata().projects().values().forEach(projectMetadata -> {
ProjectId projectId = projectMetadata.id();
final boolean hasMetadata = projectMetadata.custom(INGEST_GEOIP_CUSTOM_METADATA_TYPE) != null;
hasIngestGeoIpMetadata.put(projectId, hasMetadata);
final boolean ingestGeoIpCustomMetaChangedInEvent = event.metadataChanged()
&& event.customMetadataChanged(projectId, INGEST_GEOIP_CUSTOM_METADATA_TYPE);
/*
* We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState
* if this cluster change event involved the modification of custom geoip metadata OR a master node change
*/
if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata.getOrDefault(projectId, false))) {
maybeUpdateTaskState(projectId, isLocalNodeMaster);
}
});
}

private void maybeUpdateTaskState(ClusterState state) {
private void maybeUpdateTaskState(ProjectId projectId, boolean isLocalNodeMaster) {
// We should only start/stop task from single node, master is the best as it will go through it anyway
if (state.nodes().isLocalNodeElectedMaster()) {
if (licenseIsValid) {
if (hasIngestGeoIpMetadata) {
ensureTaskStarted();
if (isLocalNodeMaster) {
if (licenseIsValid.getOrDefault(projectId, false)) {
if (hasIngestGeoIpMetadata.getOrDefault(projectId, false)) {
ensureTaskStarted(projectId);
}
} else {
ensureTaskStopped();
ensureTaskStopped(projectId);
}
}
}

private void ensureTaskStarted() {
assert licenseIsValid : "Task should never be started without valid license";
persistentTasksService.sendStartRequest(
ENTERPRISE_GEOIP_DOWNLOADER,
private void ensureTaskStarted(ProjectId projectId) {
assert licenseIsValid.getOrDefault(projectId, false) : "Task should never be started without valid license";
persistentTasksService.sendProjectStartRequest(
projectId,
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
ENTERPRISE_GEOIP_DOWNLOADER,
new EnterpriseGeoIpTaskParams(),
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
Expand All @@ -127,7 +142,7 @@ private void ensureTaskStarted() {
);
}

private void ensureTaskStopped() {
private void ensureTaskStopped(ProjectId projectId) {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
r -> logger.debug("Stopped enterprise geoip downloader task"),
e -> {
Expand All @@ -137,6 +152,15 @@ private void ensureTaskStopped() {
}
}
);
persistentTasksService.sendRemoveRequest(ENTERPRISE_GEOIP_DOWNLOADER, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, listener);
persistentTasksService.sendProjectRemoveRequest(
projectId,
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
listener
);
}

protected static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) {
return supportsMultipleProjects ? projectId + "/" + ENTERPRISE_GEOIP_DOWNLOADER : ENTERPRISE_GEOIP_DOWNLOADER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.license.License;
Expand Down Expand Up @@ -48,6 +52,8 @@
public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {

private ThreadPool threadPool;
@NotMultiProjectCapable(description = "Enterprise license not available in serverless or multi-project yet")
private final ProjectResolver projectResolver = TestProjectResolvers.DEFAULT_PROJECT_ONLY;

@Before
public void setup() {
Expand All @@ -68,12 +74,13 @@ public void testAllConditionsMetOnStart() {
// Should never start if not master node, even if all other conditions have been met
final XPackLicenseState licenseState = getAlwaysValidLicense();
ClusterService clusterService = createClusterService(true, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, true, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, true, false, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client,
clusterService,
threadPool,
licenseState
licenseState,
projectResolver
);
listener.init();
listener.licenseStateChanged();
Expand All @@ -85,12 +92,13 @@ public void testLicenseChanges() {
final TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState();
licenseState.update(new XPackLicenseStatus(License.OperationMode.TRIAL, false, ""));
ClusterService clusterService = createClusterService(true, true);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, true);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, true, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client,
clusterService,
threadPool,
licenseState
licenseState,
projectResolver
);
listener.init();
listener.licenseStateChanged();
Expand All @@ -110,12 +118,13 @@ public void testLicenseChanges() {
public void testDatabaseChanges() {
final XPackLicenseState licenseState = getAlwaysValidLicense();
ClusterService clusterService = createClusterService(true, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client,
clusterService,
threadPool,
licenseState
licenseState,
projectResolver
);
listener.init();
listener.licenseStateChanged();
Expand All @@ -134,12 +143,13 @@ public void testMasterChanges() {
// Should never start if not master node, even if all other conditions have been met
final XPackLicenseState licenseState = getAlwaysValidLicense();
ClusterService clusterService = createClusterService(false, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client,
clusterService,
threadPool,
licenseState
licenseState,
projectResolver
);
listener.init();
listener.licenseStateChanged();
Expand Down Expand Up @@ -172,7 +182,15 @@ private ClusterState createClusterState(boolean isMasterNode, boolean hasGeoIpDa
ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("name"));
if (hasGeoIpDatabases) {
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of());
clusterStateBuilder.metadata(Metadata.builder().putCustom(INGEST_GEOIP_CUSTOM_METADATA_TYPE, tasksCustomMetadata).put(idxMeta));
clusterStateBuilder.metadata(
Metadata.builder()
.put(
ProjectMetadata.builder(projectResolver.getProjectId())
.putCustom(INGEST_GEOIP_CUSTOM_METADATA_TYPE, tasksCustomMetadata)
.put(idxMeta)
.build()
)
);
}
return clusterStateBuilder.nodes(discoveryNodesBuilder).build();
}
Expand All @@ -184,8 +202,13 @@ private static class TaskStartAndRemoveMockClient extends NoOpClient {
private boolean taskStartCalled = false;
private boolean taskRemoveCalled = false;

private TaskStartAndRemoveMockClient(ThreadPool threadPool, boolean expectStartTask, boolean expectRemoveTask) {
super(threadPool);
private TaskStartAndRemoveMockClient(
ThreadPool threadPool,
boolean expectStartTask,
boolean expectRemoveTask,
ProjectResolver projectResolver
) {
super(threadPool, projectResolver);
this.expectStartTask = expectStartTask;
this.expectRemoveTask = expectRemoveTask;
}
Expand Down