Skip to content

Commit d9c7dea

Browse files
committed
Remove write-based sampling from nodetool traincompressiondictionary
1 parent 798ebae commit d9c7dea

11 files changed

+154
-930
lines changed

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.cassandra.db.compression;
2020

2121
import java.nio.ByteBuffer;
22-
import java.util.Map;
2322
import java.util.Set;
2423
import javax.annotation.Nullable;
2524

@@ -113,8 +112,6 @@ public synchronized void maybeReloadFromSchema(CompressionParams newParams)
113112

114113
if (needsNewTrainer)
115114
{
116-
// The manual training should be cancelled if a new trainer is needed
117-
scheduler.cancelManualTraining();
118115
// Close existing trainer and create a new one
119116
if (trainer != null)
120117
{
@@ -201,7 +198,7 @@ public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId)
201198
}
202199

203200
@Override
204-
public synchronized void train(Map<String, String> options)
201+
public synchronized void train()
205202
{
206203
// Validate table supports dictionary compression
207204
if (!isEnabled)
@@ -214,32 +211,26 @@ public synchronized void train(Map<String, String> options)
214211
throw new IllegalStateException("Dictionary trainer is not available for table " + keyspaceName + '.' + tableName);
215212
}
216213

217-
// Parse and validate training options
218-
ManualTrainingOptions trainingOptions = ManualTrainingOptions.fromStringMap(options);
219-
220-
if (trainingOptions.useExistingSSTables())
214+
// SSTable-based training: sample from existing SSTables
215+
Set<SSTableReader> sstables = columnFamilyStore.getLiveSSTables();
216+
if (sstables.isEmpty())
221217
{
222-
// SSTable-based training: sample from existing SSTables
223-
Set<SSTableReader> sstables = columnFamilyStore.getLiveSSTables();
218+
logger.info("No SSTables available for training in table {}.{}, flushing memtable first",
219+
keyspaceName, tableName);
220+
columnFamilyStore.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
221+
sstables = columnFamilyStore.getLiveSSTables();
222+
224223
if (sstables.isEmpty())
225224
{
226-
throw new IllegalStateException("No SSTables available for training in table " + keyspaceName + '.' + tableName);
225+
throw new IllegalStateException("No SSTables available for training in table " + keyspaceName + '.' + tableName + " after flush");
227226
}
228-
229-
logger.info("Starting SSTable-based training for {}.{} with {} SSTables",
230-
keyspaceName, tableName, sstables.size());
231-
232-
trainer.start(true);
233-
scheduler.scheduleSSTableBasedTraining(trainingOptions, trainer, sstables, createTrainingConfig());
234227
}
235-
else
236-
{
237-
// Write-based training: sample from new writes
238-
logger.info("Starting write-based training for {}.{}", keyspaceName, tableName);
239228

240-
trainer.start(true);
241-
scheduler.scheduleManualTraining(trainingOptions, trainer);
242-
}
229+
logger.info("Starting SSTable-based training for {}.{} with {} SSTables",
230+
keyspaceName, tableName, sstables.size());
231+
232+
trainer.start(true);
233+
scheduler.scheduleSSTableBasedTraining(trainer, sstables, createTrainingConfig());
243234
}
244235

245236
@Override
@@ -253,17 +244,6 @@ public String getTrainingStatus()
253244
return dictionaryTrainer.getTrainingStatus().toString();
254245
}
255246

256-
@Override
257-
public void updateSamplingRate(int samplingRate)
258-
{
259-
ICompressionDictionaryTrainer dictionaryTrainer = trainer;
260-
if (dictionaryTrainer == null)
261-
{
262-
throw new IllegalArgumentException("Dictionary trainer is not available for table " + keyspaceName + '.' + tableName);
263-
}
264-
dictionaryTrainer.updateSamplingRate(samplingRate);
265-
}
266-
267247
@Override
268248
public long getSampleCount()
269249
{

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,19 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21-
import java.util.Map;
22-
2321
public interface CompressionDictionaryManagerMBean
2422
{
2523
String MBEAN_NAME = "org.apache.cassandra.db.compression:type=CompressionDictionaryManager";
2624

2725
/**
28-
* Starts sampling and training for this table.
29-
*
30-
* @param options options for the training process (currently unused, reserved for future extensions)
26+
* Starts training from existing SSTables for this table.
27+
* Samples chunks from all live SSTables and trains a compression dictionary.
28+
* If no SSTables are available, automatically flushes the memtable first.
29+
*
3130
* @throws UnsupportedOperationException if table doesn't support dictionary compression
32-
* @throws IllegalStateException if training is already in progress for this table
31+
* @throws IllegalStateException if training is already in progress for this table or no SSTables available after flush
3332
*/
34-
void train(Map<String, String> options);
33+
void train();
3534

3635
/**
3736
* Gets the current training status for this table.
@@ -41,15 +40,6 @@ public interface CompressionDictionaryManagerMBean
4140
*/
4241
String getTrainingStatus();
4342

44-
/**
45-
* Updates the sampling rate for the trainer.
46-
*
47-
* @param samplingRate the new sampling rate. For exmaple, 1 = sample every time (100%);
48-
* 2 = expect sample 1/2 of data (50%), n = expect sample 1/n of data
49-
* @throws IllegalArgumentException if sampling rate is invalid or trainer is not available
50-
*/
51-
void updateSamplingRate(int samplingRate);
52-
5343
/**
5444
* Gets the number of samples collected so far during training.
5545
*

src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java

Lines changed: 15 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@
3131

3232
import org.apache.cassandra.concurrent.ScheduledExecutors;
3333
import org.apache.cassandra.config.DatabaseDescriptor;
34-
import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
3534
import org.apache.cassandra.io.sstable.format.SSTableReader;
3635
import org.apache.cassandra.schema.SystemDistributedKeyspace;
37-
import org.apache.cassandra.utils.Clock;
3836
import org.apache.cassandra.utils.concurrent.Ref;
3937

4038
/**
@@ -85,41 +83,7 @@ public void scheduleRefreshTask()
8583
}
8684

8785
@Override
88-
public void scheduleManualTraining(ManualTrainingOptions options, ICompressionDictionaryTrainer trainer)
89-
{
90-
if (scheduledManualTrainingTask != null)
91-
{
92-
throw new IllegalStateException("Training already in progress for table " + keyspaceName + '.' + tableName);
93-
}
94-
95-
int maxSamplingDurationSeconds = options.getMaxSamplingDurationSeconds();
96-
97-
logger.info("Starting manual dictionary training for {}.{} with max sampling duration: {} seconds",
98-
keyspaceName, tableName, maxSamplingDurationSeconds);
99-
100-
long deadlineMillis = Clock.Global.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxSamplingDurationSeconds);
101-
102-
ManualTrainingTask task = new ManualTrainingTask(deadlineMillis, trainer);
103-
104-
// Check every second whether it gets enough samples and completes training
105-
scheduledManualTrainingTask = ScheduledExecutors.scheduledTasks
106-
.scheduleWithFixedDelay(task, 1, 1, TimeUnit.SECONDS);
107-
}
108-
109-
@Override
110-
public void cancelManualTraining()
111-
{
112-
ScheduledFuture<?> future = scheduledManualTrainingTask;
113-
if (future != null)
114-
{
115-
future.cancel(false);
116-
}
117-
scheduledManualTrainingTask = null;
118-
}
119-
120-
@Override
121-
public void scheduleSSTableBasedTraining(ManualTrainingOptions options,
122-
ICompressionDictionaryTrainer trainer,
86+
public void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
12387
Set<SSTableReader> sstables,
12488
CompressionDictionaryTrainingConfig config)
12589
{
@@ -140,6 +104,19 @@ public void scheduleSSTableBasedTraining(ManualTrainingOptions options,
140104
scheduledManualTrainingTask = ScheduledExecutors.scheduledTasks.schedule(() -> {}, 1, TimeUnit.HOURS);
141105
}
142106

107+
/**
108+
* Cancels the in-progress manual training task.
109+
*/
110+
private void cancelManualTraining()
111+
{
112+
ScheduledFuture<?> future = scheduledManualTrainingTask;
113+
if (future != null)
114+
{
115+
future.cancel(false);
116+
}
117+
scheduledManualTrainingTask = null;
118+
}
119+
143120
/**
144121
* Sets the enabled state of the scheduler. When disabled, refresh tasks will not execute.
145122
*
@@ -190,55 +167,8 @@ public void close()
190167
}
191168
}
192169

193-
private class ManualTrainingTask implements Runnable
194-
{
195-
private final long deadlineMillis;
196-
private final ICompressionDictionaryTrainer trainer;
197-
private boolean isTraining = false;
198-
199-
private ManualTrainingTask(long deadlineMillis, ICompressionDictionaryTrainer trainer)
200-
{
201-
this.deadlineMillis = deadlineMillis;
202-
this.trainer = trainer;
203-
}
204-
205-
@Override
206-
public void run()
207-
{
208-
if (trainer.getTrainingStatus() == TrainingStatus.NOT_STARTED)
209-
{
210-
logger.warn("Trainer is not started. Stop training dictionary for table {}.{}", keyspaceName, tableName);
211-
cancelManualTraining();
212-
return;
213-
}
214-
215-
long now = Clock.Global.currentTimeMillis();
216-
// Force training if there are not enough samples, but we have hit the max sampling duration
217-
boolean reachedDeadline = now >= deadlineMillis;
218-
if (!isTraining && (trainer.isReady() || reachedDeadline))
219-
{
220-
// Set isTraining to only enter the branch once
221-
isTraining = true;
222-
trainer.trainDictionaryAsync(reachedDeadline)
223-
.addCallback((dictionary, throwable) -> {
224-
cancelManualTraining();
225-
if (throwable != null)
226-
{
227-
logger.error("Manual dictionary training failed for {}.{}: {}",
228-
keyspaceName, tableName, throwable.getMessage());
229-
}
230-
else
231-
{
232-
logger.info("Manual dictionary training completed for {}.{}", keyspaceName, tableName);
233-
}
234-
});
235-
}
236-
}
237-
}
238-
239170
/**
240171
* Task that samples chunks from existing SSTables and triggers training.
241-
* Similar to ManualTrainingTask but reads from disk instead of waiting for writes.
242172
* Acquires references to SSTables to prevent them from being deleted during sampling.
243173
*/
244174
private class SSTableSamplingTask implements Runnable
@@ -270,7 +200,7 @@ private SSTableSamplingTask(Set<SSTableReader> sstables,
270200
else
271201
{
272202
logger.debug("Couldn't acquire reference to SSTable {}. It may have been removed.",
273-
sstable.descriptor);
203+
sstable.descriptor);
274204
}
275205
}
276206

src/java/org/apache/cassandra/db/compression/ICompressionDictionaryScheduler.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,34 +37,18 @@ public interface ICompressionDictionaryScheduler extends AutoCloseable
3737
*/
3838
void scheduleRefreshTask();
3939

40-
/**
41-
* Schedules manual training with the specified options.
42-
*
43-
* @param options parsed and validated training options
44-
* @param trainer the trainer to use
45-
* @throws IllegalStateException if training is already in progress
46-
*/
47-
void scheduleManualTraining(ManualTrainingOptions options, ICompressionDictionaryTrainer trainer);
48-
4940
/**
5041
* Schedules SSTable-based training that samples from existing SSTables.
5142
*
52-
* @param options parsed and validated training options
5343
* @param trainer the trainer to use
5444
* @param sstables the set of SSTables to sample from
5545
* @param config the training configuration
5646
* @throws IllegalStateException if training is already in progress
5747
*/
58-
void scheduleSSTableBasedTraining(ManualTrainingOptions options,
59-
ICompressionDictionaryTrainer trainer,
48+
void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
6049
Set<SSTableReader> sstables,
6150
CompressionDictionaryTrainingConfig config);
6251

63-
/**
64-
* Cancel the in-progress manual training
65-
*/
66-
void cancelManualTraining();
67-
6852
/**
6953
* Sets the enabled state of the scheduler. When disabled, refresh tasks will not execute.
7054
*

0 commit comments

Comments
 (0)