diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c687e227..66ebdf1c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,18 +3,11 @@ All notable changes to this project are documented in this file. Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) -## [Unreleased 3.x](https://github.com/opensearch-project/flow-framework/compare/3.0...HEAD) +## [Unreleased 3.1](https://github.com/opensearch-project/flow-framework/compare/3.0...HEAD) ### Features ### Enhancements -### Bug Fixes -### Infrastructure -### Documentation -### Maintenance -### Refactoring +- Make thread pool sizes configurable ([#1139](https://github.com/opensearch-project/flow-framework/issues/1139)) -## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.18...2.x) -### Features -### Enhancements ### Bug Fixes ### Infrastructure ### Documentation diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 1c371326a..df22c2ccc 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -90,6 +90,7 @@ import static org.opensearch.flowframework.common.CommonValue.TENANT_ID_FIELD; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.DEPROVISION_THREAD_POOL_SIZE; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED; @@ -97,12 +98,14 @@ import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_ACTIVE_PROVISIONS_PER_TENANT; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.PROVISION_THREAD_POOL_SIZE; import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_ENDPOINT; import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_REGION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_SERVICE_NAME; import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_TYPE; import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_THREAD_POOL_SIZE; import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY; import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY; import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY; @@ -242,7 +245,10 @@ public List> getSettings() { TASK_REQUEST_RETRY_DURATION, FILTER_BY_BACKEND_ROLES, FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED, + WORKFLOW_THREAD_POOL_SIZE, + PROVISION_THREAD_POOL_SIZE, MAX_ACTIVE_PROVISIONS_PER_TENANT, + DEPROVISION_THREAD_POOL_SIZE, MAX_ACTIVE_DEPROVISIONS_PER_TENANT, REMOTE_METADATA_TYPE, REMOTE_METADATA_ENDPOINT, @@ -253,25 +259,26 @@ public List> getSettings() { @Override public List> getExecutorBuilders(Settings settings) { + int maxSizeFromAllocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings) - 1; return List.of( new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, 1, - Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1), + Math.max(WORKFLOW_THREAD_POOL_SIZE.get(settings), maxSizeFromAllocatedProcessors), TimeValue.timeValueMinutes(1), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ), new ScalingExecutorBuilder( PROVISION_WORKFLOW_THREAD_POOL, 1, - Math.max(8, OpenSearchExecutors.allocatedProcessors(settings) - 1), + Math.max(PROVISION_THREAD_POOL_SIZE.get(settings), maxSizeFromAllocatedProcessors), TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL ), new ScalingExecutorBuilder( DEPROVISION_WORKFLOW_THREAD_POOL, 1, - Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1), + Math.max(DEPROVISION_THREAD_POOL_SIZE.get(settings), maxSizeFromAllocatedProcessors), TimeValue.timeValueMinutes(1), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL ) diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java index 841670f6e..7b7aa345c 100644 --- a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java +++ b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java @@ -33,8 +33,14 @@ public class FlowFrameworkSettings { protected volatile TimeValue requestTimeout; /** Whether multitenancy is enabled */ private final Boolean isMultiTenancyEnabled; + /** Size of the threadpool used for retryable tasks in workflows */ + private volatile Integer workflowThreadPoolSize; + /** Size of the threadpool for provisioning */ + private volatile Integer provisionThreadPoolSize; /** Max simultaneous provision requests */ private volatile Integer maxActiveProvisionsPerTenant; + /** Size of the threadpool for deprovisioning */ + private volatile Integer deprovisionThreadPoolSize; /** Max simultaneous deprovision requests */ private volatile Integer maxActiveDeprovisionsPerTenant; @@ -133,22 +139,49 @@ public class FlowFrameworkSettings { Setting.Property.NodeScope ); + /** This setting sets max retryable tasks during workflow execution that polls results */ + public static final Setting WORKFLOW_THREAD_POOL_SIZE = Setting.intSetting( + "plugins.flow_framework.workflow_thread_pool_size", + 4, + 1, + 400, + Setting.Property.NodeScope + ); + + /** This setting sets the max size of the provision thread pool */ + public static final Setting PROVISION_THREAD_POOL_SIZE = Setting.intSetting( + "plugins.flow_framework.provision_thread_pool_size", + 8, + 1, + 800, + Setting.Property.NodeScope + ); + /** This setting sets max workflows that can be simultaneously provisioned, or reprovisioned by the same tenant */ public static final Setting MAX_ACTIVE_PROVISIONS_PER_TENANT = Setting.intSetting( "plugins.flow_framework.max_active_provisions_per_tenant", 2, 1, - 4, + 40, Setting.Property.NodeScope, Setting.Property.Dynamic ); + /** This setting sets the max size of the deprovision thread pool */ + public static final Setting DEPROVISION_THREAD_POOL_SIZE = Setting.intSetting( + "plugins.flow_framework.deprovision_thread_pool_size", + 4, + 1, + 400, + Setting.Property.NodeScope + ); + /** This setting sets max workflows that can be simultaneously deprovisioned by the same tenant */ public static final Setting MAX_ACTIVE_DEPROVISIONS_PER_TENANT = Setting.intSetting( "plugins.flow_framework.max_active_deprovisions_per_tenant", 1, 1, - 2, + 40, Setting.Property.NodeScope, Setting.Property.Dynamic ); @@ -195,7 +228,10 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) { this.maxWorkflows = MAX_WORKFLOWS.get(settings); this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings); this.isMultiTenancyEnabled = FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings); + this.workflowThreadPoolSize = WORKFLOW_THREAD_POOL_SIZE.get(settings); + this.provisionThreadPoolSize = PROVISION_THREAD_POOL_SIZE.get(settings); this.maxActiveProvisionsPerTenant = MAX_ACTIVE_PROVISIONS_PER_TENANT.get(settings); + this.deprovisionThreadPoolSize = DEPROVISION_THREAD_POOL_SIZE.get(settings); this.maxActiveDeprovisionsPerTenant = MAX_ACTIVE_DEPROVISIONS_PER_TENANT.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it); clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it); @@ -256,6 +292,22 @@ public boolean isMultiTenancyEnabled() { return isMultiTenancyEnabled; } + /** + * Getter for workflow thread pool max size + * @return workflow thread pool max + */ + public Integer getWorkflowThreadPoolSize() { + return workflowThreadPoolSize; + } + + /** + * Getter for provision thread pool max size + * @return provision thread pool max + */ + public Integer getProvisionThreadPoolSize() { + return provisionThreadPoolSize; + } + /** * Getter for max active provisions per tenant * @return max active provisions @@ -264,6 +316,14 @@ public Integer getMaxActiveProvisionsPerTenant() { return maxActiveProvisionsPerTenant; } + /** + * Getter for deprovision thread pool max size + * @return deprovision thread pool max + */ + public Integer getDeprovisionThreadPoolSize() { + return deprovisionThreadPoolSize; + } + /** * Getter for max active deprovisions per tenant * @return max active deprovisions diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index dc350f424..c64f3cf5c 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -110,7 +110,7 @@ public void testPlugin() throws IOException { assertEquals(9, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); assertEquals(10, ffp.getActions().size()); assertEquals(3, ffp.getExecutorBuilders(settings).size()); - assertEquals(13, ffp.getSettings().size()); + assertEquals(16, ffp.getSettings().size()); Collection systemIndexDescriptors = ffp.getSystemIndexDescriptors(Settings.EMPTY); assertEquals(3, systemIndexDescriptors.size()); diff --git a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java index fac3cf01d..21abd396a 100644 --- a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java +++ b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java @@ -45,7 +45,10 @@ public void setUp() throws Exception { FlowFrameworkSettings.MAX_WORKFLOWS, FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT, FlowFrameworkSettings.FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED, + FlowFrameworkSettings.WORKFLOW_THREAD_POOL_SIZE, + FlowFrameworkSettings.PROVISION_THREAD_POOL_SIZE, FlowFrameworkSettings.MAX_ACTIVE_PROVISIONS_PER_TENANT, + FlowFrameworkSettings.DEPROVISION_THREAD_POOL_SIZE, FlowFrameworkSettings.MAX_ACTIVE_DEPROVISIONS_PER_TENANT ) ).collect(Collectors.toSet()); @@ -67,7 +70,10 @@ public void testSettings() throws IOException { assertEquals(Optional.of(1000), Optional.ofNullable(flowFrameworkSettings.getMaxWorkflows())); assertEquals(Optional.of(TimeValue.timeValueSeconds(10)), Optional.ofNullable(flowFrameworkSettings.getRequestTimeout())); assertFalse(flowFrameworkSettings.isMultiTenancyEnabled()); + assertEquals(Optional.of(4), Optional.ofNullable(flowFrameworkSettings.getWorkflowThreadPoolSize())); + assertEquals(Optional.of(8), Optional.ofNullable(flowFrameworkSettings.getProvisionThreadPoolSize())); assertEquals(Optional.of(2), Optional.ofNullable(flowFrameworkSettings.getMaxActiveProvisionsPerTenant())); + assertEquals(Optional.of(4), Optional.ofNullable(flowFrameworkSettings.getDeprovisionThreadPoolSize())); assertEquals(Optional.of(1), Optional.ofNullable(flowFrameworkSettings.getMaxActiveDeprovisionsPerTenant())); } }