Skip to content

Commit 057e8ad

Browse files
authored
report progress when initializing/rebuilding suggester (#4343)
1 parent eec63c4 commit 057e8ad

File tree

4 files changed

+267
-31
lines changed

4 files changed

+267
-31
lines changed

opengrok-web/src/main/java/org/opengrok/web/api/v1/suggester/provider/service/impl/SuggesterServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ private void initSuggester() {
320320
suggesterConfig.getAllowedFields(),
321321
suggesterConfig.getTimeThreshold(),
322322
rebuildParalleismLevel,
323-
Metrics.getRegistry());
323+
Metrics.getRegistry(),
324+
env.isPrintProgress());
324325

325326
new Thread(() -> {
326327
suggester.init(getAllProjectIndexDirs());

suggester/src/main/java/org/opengrok/suggest/Suggester.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919

2020
/*
21-
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
21+
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
2222
*/
2323
package org.opengrok.suggest;
2424

@@ -34,6 +34,7 @@
3434
import org.apache.lucene.util.BytesRef;
3535
import org.opengrok.suggest.query.SuggesterPrefixQuery;
3636
import org.opengrok.suggest.query.SuggesterQuery;
37+
import org.opengrok.suggest.util.Progress;
3738

3839
import java.io.Closeable;
3940
import java.io.File;
@@ -93,6 +94,7 @@ public final class Suggester implements Closeable {
9394
private final int timeThreshold;
9495

9596
private final int rebuildParallelismLevel;
97+
private final boolean isPrintProgress;
9698

9799
private volatile boolean rebuilding;
98100
private volatile boolean terminating;
@@ -123,7 +125,9 @@ public final class Suggester implements Closeable {
123125
* @param allowedFields fields for which should the suggester be enabled,
124126
* if {@code null} then enabled for all fields
125127
* @param timeThreshold time in milliseconds after which the suggestions requests should time out
128+
* @param rebuildParallelismLevel parallelism level for rebuild
126129
* @param registry meter registry
130+
* @param isPrintProgress whether to report progress for initialization and rebuild
127131
*/
128132
public Suggester(
129133
final File suggesterDir,
@@ -134,7 +138,8 @@ public Suggester(
134138
final Set<String> allowedFields,
135139
final int timeThreshold,
136140
final int rebuildParallelismLevel,
137-
MeterRegistry registry) {
141+
MeterRegistry registry,
142+
boolean isPrintProgress) {
138143
if (suggesterDir == null) {
139144
throw new IllegalArgumentException("Suggester needs to have directory specified");
140145
}
@@ -152,6 +157,7 @@ public Suggester(
152157
this.allowedFields = new HashSet<>(allowedFields);
153158
this.timeThreshold = timeThreshold;
154159
this.rebuildParallelismLevel = rebuildParallelismLevel;
160+
this.isPrintProgress = isPrintProgress;
155161

156162
suggesterRebuildTimer = Timer.builder("suggester.rebuild.latency").
157163
description("suggester rebuild latency").
@@ -180,17 +186,20 @@ public void init(final Collection<NamedIndexDir> luceneIndexes) {
180186

181187
ExecutorService executor = Executors.newWorkStealingPool(rebuildParallelismLevel);
182188

183-
for (NamedIndexDir indexDir : luceneIndexes) {
184-
if (terminating) {
185-
LOGGER.log(Level.INFO, "Terminating suggester initialization");
186-
return;
189+
try (Progress progress = new Progress(LOGGER, "suggester initialization", luceneIndexes.size(),
190+
Level.INFO, isPrintProgress)) {
191+
for (NamedIndexDir indexDir : luceneIndexes) {
192+
if (terminating) {
193+
LOGGER.log(Level.INFO, "Terminating suggester initialization");
194+
return;
195+
}
196+
submitInitIfIndexExists(executor, indexDir, progress);
187197
}
188-
submitInitIfIndexExists(executor, indexDir);
189-
}
190198

191-
shutdownAndAwaitTermination(executor, start, suggesterInitTimer,
192-
"Suggester successfully initialized");
193-
initDone.countDown();
199+
shutdownAndAwaitTermination(executor, start, suggesterInitTimer,
200+
"Suggester successfully initialized");
201+
initDone.countDown();
202+
}
194203
}
195204
}
196205

@@ -206,10 +215,11 @@ public void waitForInit(long timeout, TimeUnit unit) throws InterruptedException
206215
}
207216
}
208217

209-
private void submitInitIfIndexExists(final ExecutorService executorService, final NamedIndexDir indexDir) {
218+
private void submitInitIfIndexExists(final ExecutorService executorService, final NamedIndexDir indexDir,
219+
Progress progress) {
210220
try {
211221
if (indexExists(indexDir.path)) {
212-
executorService.submit(getInitRunnable(indexDir));
222+
executorService.submit(getInitRunnable(indexDir, progress));
213223
} else {
214224
LOGGER.log(Level.FINE, "Index in {0} directory does not exist, skipping...", indexDir);
215225
}
@@ -218,7 +228,7 @@ private void submitInitIfIndexExists(final ExecutorService executorService, fina
218228
}
219229
}
220230

221-
private Runnable getInitRunnable(final NamedIndexDir indexDir) {
231+
private Runnable getInitRunnable(final NamedIndexDir indexDir, Progress progress) {
222232
return () -> {
223233
try {
224234
if (terminating) {
@@ -239,6 +249,7 @@ private Runnable getInitRunnable(final NamedIndexDir indexDir) {
239249

240250
Duration d = Duration.between(start, Instant.now());
241251
LOGGER.log(Level.FINE, "Finished initialization of {0}, took {1}", new Object[] {indexDir, d});
252+
progress.increment();
242253
} catch (Exception e) {
243254
LOGGER.log(Level.SEVERE, String.format("Could not initialize suggester data for %s", indexDir), e);
244255
}
@@ -300,17 +311,20 @@ public void rebuild(final Collection<NamedIndexDir> indexDirs) {
300311

301312
ExecutorService executor = Executors.newWorkStealingPool(rebuildParallelismLevel);
302313

303-
for (NamedIndexDir indexDir : indexDirs) {
304-
SuggesterProjectData data = this.projectData.get(indexDir.name);
305-
if (data != null) {
306-
executor.submit(getRebuildRunnable(data));
307-
} else {
308-
submitInitIfIndexExists(executor, indexDir);
314+
try (Progress progress = new Progress(LOGGER, "suggester rebuild", indexDirs.size(),
315+
Level.INFO, isPrintProgress)) {
316+
for (NamedIndexDir indexDir : indexDirs) {
317+
SuggesterProjectData data = this.projectData.get(indexDir.name);
318+
if (data != null) {
319+
executor.submit(getRebuildRunnable(data, progress));
320+
} else {
321+
submitInitIfIndexExists(executor, indexDir, progress);
322+
}
309323
}
310-
}
311324

312-
shutdownAndAwaitTermination(executor, start, suggesterRebuildTimer,
313-
"Suggesters for " + indexDirs + " were successfully rebuilt");
325+
shutdownAndAwaitTermination(executor, start, suggesterRebuildTimer,
326+
"Suggesters for " + indexDirs + " were successfully rebuilt");
327+
}
314328
}
315329

316330
rebuildLock.lock();
@@ -341,7 +355,7 @@ public void waitForRebuild(long timeout, TimeUnit unit) throws InterruptedExcept
341355
}
342356
}
343357

344-
private Runnable getRebuildRunnable(final SuggesterProjectData data) {
358+
private Runnable getRebuildRunnable(final SuggesterProjectData data, Progress progress) {
345359
return () -> {
346360
try {
347361
if (terminating) {
@@ -354,6 +368,7 @@ private Runnable getRebuildRunnable(final SuggesterProjectData data) {
354368

355369
Duration d = Duration.between(start, Instant.now());
356370
LOGGER.log(Level.FINE, "Rebuild of {0} finished, took {1}", new Object[] {data, d});
371+
progress.increment();
357372
} catch (Exception e) {
358373
LOGGER.log(Level.SEVERE, "Could not rebuild suggester", e);
359374
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* CDDL HEADER START
3+
*
4+
* The contents of this file are subject to the terms of the
5+
* Common Development and Distribution License (the "License").
6+
* You may not use this file except in compliance with the License.
7+
*
8+
* See LICENSE.txt included in this distribution for the specific
9+
* language governing permissions and limitations under the License.
10+
*
11+
* When distributing Covered Code, include this CDDL HEADER in each
12+
* file and include the License file at LICENSE.txt.
13+
* If applicable, add the following below this CDDL HEADER, with the
14+
* fields enclosed by brackets "[]" replaced with your own identifying
15+
* information: Portions Copyright [yyyy] [name of copyright owner]
16+
*
17+
* CDDL HEADER END
18+
*/
19+
20+
/*
21+
* Copyright (c) 2007, 2023, Oracle and/or its affiliates. All rights reserved.
22+
* Portions Copyright (c) 2017, 2020, Chris Fraire <[email protected]>.
23+
*/
24+
package org.opengrok.suggest.util;
25+
26+
import org.jetbrains.annotations.VisibleForTesting;
27+
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.TreeMap;
34+
import java.util.concurrent.atomic.AtomicLong;
35+
import java.util.logging.Level;
36+
import java.util.logging.Logger;
37+
38+
/**
39+
* Copy of {@code org.opengrok.indexer.util.Progress}.
40+
* <p>
41+
* Progress reporting via logging. The idea is that for anything that has a set of items
42+
* to go through, it will ping an instance of this class for each item completed.
43+
* This class will then log based on the number of pings. The bigger the progress,
44+
* the higher log level ({@link Level} value) will be used. The default base level is {@code Level.INFO}.
45+
* Regardless of the base level, maximum 4 log levels will be used.
46+
* </p>
47+
*/
48+
public class Progress implements AutoCloseable {
49+
private final Logger logger;
50+
private final Long totalCount;
51+
private final String suffix;
52+
53+
private final AtomicLong currentCount = new AtomicLong();
54+
private final Map<Level, Integer> levelCountMap = new TreeMap<>(Comparator.comparingInt(Level::intValue).reversed());
55+
private Thread loggerThread = null;
56+
private volatile boolean run;
57+
58+
private final Level baseLogLevel;
59+
60+
private final Object sync = new Object();
61+
62+
/**
63+
* @param logger logger instance
64+
* @param suffix string suffix to identify the operation
65+
* @param totalCount total count
66+
* @param logLevel base log level
67+
* @param isPrintProgress whether to print the progress
68+
*/
69+
public Progress(Logger logger, String suffix, long totalCount, Level logLevel, boolean isPrintProgress) {
70+
this.logger = logger;
71+
this.suffix = suffix;
72+
this.baseLogLevel = logLevel;
73+
74+
if (totalCount < 0) {
75+
this.totalCount = null;
76+
} else {
77+
this.totalCount = totalCount;
78+
}
79+
80+
// Note: Level.CONFIG is missing as it does not make too much sense for progress reporting semantically.
81+
final List<Level> standardLevels = Arrays.asList(Level.OFF, Level.SEVERE, Level.WARNING, Level.INFO,
82+
Level.FINE, Level.FINER, Level.FINEST, Level.ALL);
83+
int i = standardLevels.indexOf(baseLogLevel);
84+
for (int num : new int[]{100, 50, 10, 1}) {
85+
if (i >= standardLevels.size()) {
86+
break;
87+
}
88+
89+
Level level = standardLevels.get(i);
90+
if (level == null) {
91+
break;
92+
}
93+
levelCountMap.put(level, num);
94+
if (num == 1) {
95+
break;
96+
}
97+
i++;
98+
}
99+
100+
// Assuming the printProgress configuration setting cannot be changed on the fly.
101+
if (!baseLogLevel.equals(Level.OFF) && isPrintProgress) {
102+
spawnLogThread();
103+
}
104+
}
105+
106+
private void spawnLogThread() {
107+
// spawn a logger thread.
108+
run = true;
109+
loggerThread = new Thread(this::logLoop,
110+
"progress-thread-" + suffix.replaceAll(" ", "_"));
111+
loggerThread.start();
112+
}
113+
114+
/**
115+
* Increment counter. The actual logging will be done eventually.
116+
*/
117+
public void increment() {
118+
this.currentCount.incrementAndGet();
119+
120+
if (loggerThread != null) {
121+
// nag the thread.
122+
synchronized (sync) {
123+
sync.notifyAll();
124+
}
125+
}
126+
}
127+
128+
private void logLoop() {
129+
long cachedCount = 0;
130+
Map<Level, Long> lastLoggedChunk = new HashMap<>();
131+
132+
while (true) {
133+
long currentCount = this.currentCount.get();
134+
Level currentLevel = Level.FINEST;
135+
136+
// Do not log if there was no progress.
137+
if (cachedCount < currentCount) {
138+
currentLevel = getLevel(lastLoggedChunk, currentCount, currentLevel);
139+
logIt(lastLoggedChunk, currentCount, currentLevel);
140+
}
141+
142+
if (!run) {
143+
return;
144+
}
145+
146+
cachedCount = currentCount;
147+
148+
// wait for event
149+
try {
150+
synchronized (sync) {
151+
if (!run) {
152+
// Loop once more to do the final logging.
153+
continue;
154+
}
155+
sync.wait();
156+
}
157+
} catch (InterruptedException e) {
158+
logger.log(Level.WARNING, "logger thread interrupted");
159+
}
160+
}
161+
}
162+
163+
@VisibleForTesting
164+
Level getLevel(Map<Level, Long> lastLoggedChunk, long currentCount, Level currentLevel) {
165+
// The intention is to log the initial and final count at the base log level.
166+
if (currentCount <= 1 || (totalCount != null && currentCount == totalCount)) {
167+
currentLevel = baseLogLevel;
168+
} else {
169+
// Set the log level based on the "buckets".
170+
for (Level level : levelCountMap.keySet()) {
171+
if (lastLoggedChunk.getOrDefault(level, -1L) <
172+
currentCount / levelCountMap.get(level)) {
173+
currentLevel = level;
174+
break;
175+
}
176+
}
177+
}
178+
return currentLevel;
179+
}
180+
181+
private void logIt(Map<Level, Long> lastLoggedChunk, long currentCount, Level currentLevel) {
182+
if (logger.isLoggable(currentLevel)) {
183+
lastLoggedChunk.put(currentLevel, currentCount / levelCountMap.get(currentLevel));
184+
StringBuilder stringBuilder = new StringBuilder();
185+
stringBuilder.append("Progress: ");
186+
stringBuilder.append(currentCount);
187+
stringBuilder.append(" ");
188+
if (totalCount != null) {
189+
stringBuilder.append("(");
190+
stringBuilder.append(String.format("%.2f", currentCount * 100.0f / totalCount));
191+
stringBuilder.append("%) ");
192+
}
193+
stringBuilder.append(suffix);
194+
logger.log(currentLevel, stringBuilder.toString());
195+
}
196+
}
197+
198+
@Override
199+
public void close() {
200+
if (loggerThread == null) {
201+
return;
202+
}
203+
204+
try {
205+
run = false;
206+
synchronized (sync) {
207+
sync.notifyAll();
208+
}
209+
loggerThread.join();
210+
} catch (InterruptedException e) {
211+
logger.log(Level.WARNING, "logger thread interrupted");
212+
}
213+
}
214+
}

0 commit comments

Comments
 (0)