3131import org .apache .amoro .server .table .DefaultTableRuntime ;
3232import org .apache .amoro .server .table .TableService ;
3333import org .apache .amoro .server .utils .IcebergTableUtil ;
34+ import org .apache .amoro .shade .guava32 .com .google .common .annotations .VisibleForTesting ;
3435import org .apache .amoro .shade .guava32 .com .google .common .base .Preconditions ;
3536import org .apache .amoro .table .MixedTable ;
3637
@@ -56,28 +57,38 @@ protected boolean enabled(TableRuntime tableRuntime) {
5657 @ Override
5758 protected long getNextExecutingTime (TableRuntime tableRuntime ) {
5859 DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime ) tableRuntime ;
60+
61+ if (defaultTableRuntime .getOptimizingConfig ().isRefreshTableAdaptiveEnabled (interval )) {
62+ long newInterval = defaultTableRuntime .getLatestRefreshInterval ();
63+ if (newInterval > 0 ) {
64+ return newInterval ;
65+ }
66+ }
67+
5968 return Math .min (
6069 defaultTableRuntime .getOptimizingConfig ().getMinorLeastInterval () * 4L / 5 , interval );
6170 }
6271
63- private void tryEvaluatingPendingInput (DefaultTableRuntime tableRuntime , MixedTable table ) {
72+ private boolean tryEvaluatingPendingInput (DefaultTableRuntime tableRuntime , MixedTable table ) {
6473 // only evaluate pending input when optimizing is enabled and in idle state
6574 OptimizingConfig optimizingConfig = tableRuntime .getOptimizingConfig ();
66- if ( optimizingConfig .isEnabled ()
67- && tableRuntime .getOptimizingStatus ().equals (OptimizingStatus .IDLE )) {
75+ boolean optimizingEnabled = optimizingConfig .isEnabled ();
76+ if ( optimizingEnabled && tableRuntime .getOptimizingStatus ().equals (OptimizingStatus .IDLE )) {
6877
6978 if (optimizingConfig .isMetadataBasedTriggerEnabled ()
7079 && !MetadataBasedEvaluationEvent .isEvaluatingNecessary (
7180 optimizingConfig , table , tableRuntime .getLastPlanTime ())) {
7281 logger .debug (
7382 "{} optimizing is not necessary due to metadata based trigger" ,
7483 tableRuntime .getTableIdentifier ());
75- return ;
84+ // indicates no optimization demand now
85+ return false ;
7686 }
7787
7888 AbstractOptimizingEvaluator evaluator =
7989 IcebergTableUtil .createOptimizingEvaluator (tableRuntime , table , maxPendingPartitions );
80- if (evaluator .isNecessary ()) {
90+ boolean evaluatorIsNecessary = evaluator .isNecessary ();
91+ if (evaluatorIsNecessary ) {
8192 AbstractOptimizingEvaluator .PendingInput pendingInput =
8293 evaluator .getOptimizingPendingInput ();
8394 logger .debug (
@@ -88,7 +99,21 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa
8899 } else {
89100 tableRuntime .optimizingNotNecessary ();
90101 }
102+
91103 tableRuntime .setTableSummary (evaluator .getPendingInput ());
104+ return evaluatorIsNecessary ;
105+ } else if (!optimizingEnabled ) {
106+ logger .debug (
107+ "{} optimizing is not enabled, skip evaluating pending input" ,
108+ tableRuntime .getTableIdentifier ());
109+ // indicates no optimization demand now
110+ return false ;
111+ } else {
112+ logger .debug (
113+ "{} optimizing is processing or is in preparation" , tableRuntime .getTableIdentifier ());
114+ // indicates optimization demand exists (preparation or processing),
115+ // even though we don't trigger a new evaluation in this loop.
116+ return true ;
92117 }
93118 }
94119
@@ -122,16 +147,131 @@ public void execute(TableRuntime tableRuntime) {
122147 AmoroTable <?> table = loadTable (tableRuntime );
123148 defaultTableRuntime .refresh (table );
124149 MixedTable mixedTable = (MixedTable ) table .originalTable ();
150+ // Check if there is any optimizing demand now.
151+ boolean hasOptimizingDemand = false ;
125152 if ((mixedTable .isKeyedTable ()
126153 && (lastOptimizedSnapshotId != defaultTableRuntime .getCurrentSnapshotId ()
127154 || lastOptimizedChangeSnapshotId
128155 != defaultTableRuntime .getCurrentChangeSnapshotId ()))
129156 || (mixedTable .isUnkeyedTable ()
130157 && lastOptimizedSnapshotId != defaultTableRuntime .getCurrentSnapshotId ())) {
131- tryEvaluatingPendingInput (defaultTableRuntime , mixedTable );
158+ hasOptimizingDemand = tryEvaluatingPendingInput (defaultTableRuntime , mixedTable );
159+ } else {
160+ logger .debug ("{} optimizing is not necessary" , defaultTableRuntime .getTableIdentifier ());
161+ }
162+
163+ // Update adaptive interval according to evaluated result.
164+ if (defaultTableRuntime .getOptimizingConfig ().isRefreshTableAdaptiveEnabled (interval )) {
165+ defaultTableRuntime .setLatestEvaluatedNeedOptimizing (hasOptimizingDemand );
166+ long newInterval = getAdaptiveExecutingInterval (defaultTableRuntime );
167+ defaultTableRuntime .setLatestRefreshInterval (newInterval );
132168 }
133169 } catch (Throwable throwable ) {
134170 logger .error ("Refreshing table {} failed." , tableRuntime .getTableIdentifier (), throwable );
135171 }
136172 }
173+
174+ /**
175+ * Calculate adaptive execution interval based on table optimization status.
176+ *
177+ * <p>Uses AIMD (Additive Increase Multiplicative Decrease) algorithm inspired by TCP congestion
178+ * control:
179+ *
180+ * <ul>
181+ * <li>If table does not need to be optimized: additive increase - gradually extend interval to
182+ * reduce resource consumption
183+ * <li>If table needs optimization: multiplicative decrease - rapidly reduce interval for quick
184+ * response
185+ * </ul>
186+ *
187+ * <p>Interval is bounded by [interval_min, interval_max] and kept in memory only (resets to
188+ * interval_min on restart).
189+ *
190+ * @param tableRuntime The table runtime information containing current status and configuration
191+ * @return The next execution interval in milliseconds
192+ */
193+ @ VisibleForTesting
194+ public long getAdaptiveExecutingInterval (DefaultTableRuntime tableRuntime ) {
195+ final long minInterval = interval ;
196+ final long maxInterval =
197+ tableRuntime .getOptimizingConfig ().getRefreshTableAdaptiveMaxIntervalMs ();
198+ long currentInterval = tableRuntime .getLatestRefreshInterval ();
199+
200+ // Initialize interval on first run or after restart
201+ if (currentInterval == 0 ) {
202+ currentInterval = minInterval ;
203+ }
204+
205+ // Determine whether table needs optimization
206+ boolean needOptimizing = tableRuntime .getLatestEvaluatedNeedOptimizing ();
207+
208+ long nextInterval ;
209+ if (needOptimizing ) {
210+ nextInterval = decreaseInterval (currentInterval , minInterval );
211+ logger .debug (
212+ "Table {} needs optimization, decreasing interval from {}ms to {}ms" ,
213+ tableRuntime .getTableIdentifier (),
214+ currentInterval ,
215+ nextInterval );
216+ } else {
217+ nextInterval = increaseInterval (tableRuntime , currentInterval , maxInterval );
218+ logger .debug (
219+ "Table {} does not need optimization, increasing interval from {}ms to {}ms" ,
220+ tableRuntime .getTableIdentifier (),
221+ currentInterval ,
222+ nextInterval );
223+ }
224+
225+ return nextInterval ;
226+ }
227+
228+ /**
229+ * Decrease interval when table needs optimization.
230+ *
231+ * <p>Uses multiplicative decrease (halving) inspired by TCP Fast Recovery algorithm for rapid
232+ * response to table health issues.
233+ *
234+ * @param currentInterval Current refresh interval in milliseconds
235+ * @param minInterval Minimum allowed interval in milliseconds
236+ * @return New interval after decrease.
237+ */
238+ private long decreaseInterval (long currentInterval , long minInterval ) {
239+ long newInterval = currentInterval / 2 ;
240+ long boundedInterval = Math .max (newInterval , minInterval );
241+ if (newInterval < minInterval ) {
242+ logger .debug (
243+ "Interval reached minimum boundary: attempted {}ms, capped at {}ms" ,
244+ newInterval ,
245+ minInterval );
246+ }
247+
248+ return boundedInterval ;
249+ }
250+
251+ /**
252+ * Increase interval when table does not need optimization.
253+ *
254+ * <p>Uses additive increase inspired by TCP Congestion Avoidance algorithm for gradual and stable
255+ * growth.
256+ *
257+ * @param tableRuntime The table runtime information containing configuration
258+ * @param currentInterval Current refresh interval in milliseconds
259+ * @param maxInterval Maximum allowed interval in milliseconds
260+ * @return New interval after increase.
261+ */
262+ private long increaseInterval (
263+ DefaultTableRuntime tableRuntime , long currentInterval , long maxInterval ) {
264+ long step = tableRuntime .getOptimizingConfig ().getRefreshTableAdaptiveIncreaseStepMs ();
265+ long newInterval = currentInterval + step ;
266+ long boundedInterval = Math .min (newInterval , maxInterval );
267+ if (newInterval > maxInterval ) {
268+ logger .debug (
269+ "Interval reached maximum boundary: currentInterval is {}ms, attempted {}ms, capped at {}ms" ,
270+ currentInterval ,
271+ newInterval ,
272+ maxInterval );
273+ }
274+
275+ return boundedInterval ;
276+ }
137277}
0 commit comments