Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
Change-Id: I66d3d6a774bcaa20a7c473b053490bfe614a7c95
  • Loading branch information
chenxinwei committed Jan 19, 2025
1 parent 7ab4ea5 commit 727ba2c
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 106 deletions.
25 changes: 25 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,31 @@ All available procedures are listed below.
</td>
<td>CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))</td>
</tr>
<tr>
<td>clear_consumers</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.clear_consumers(`table` => 'identifier', including_consumers => 'includingConsumers', excluding_consumers => 'excludingConsumers') <br/><br/>
-- Use indexed argument<br/>
-- clear all consumers in the table
CALL [catalog.]sys.clear_consumers('identifier')
-- clear some consumers in the table (accept regular expression)<br/>
CALL [catalog.]sys.clear_consumers('tableId', 'includingConsumers')<br/><br/>
-- exclude some consumers (accept regular expression)<br/>
CALL [catalog.]sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers')
</td>
<td>
To reset or delete consumer. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>includingConsumers: consumers to be cleared.</li>
<li>excludingConsumers: consumers which not to be cleared.</li>
</td>
<td>CALL sys.clear_consumers(`table` => 'default.T')<br/><br/>
CALL sys.clear_consumers(`table` => 'default.T', including_consumers => 'myid.*')<br/><br/>
CALL sys.reset_consumer(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')<br/><br/>
CALL sys.reset_consumer(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*')
</td>
</tr>
<tr>
<td>rollback_to</td>
<td>
Expand Down
19 changes: 19 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,25 @@ This section introduce all available spark procedures about paimon.
-- delete consumer<br/>
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
</td>
</tr>
<tr>
<td>clear_consumers</td>
<td>
To clear consumers. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>includingConsumers: consumers to be cleared.</li>
<li>excludingConsumers: consumers which not to be cleared.</li>
</td>
<td>
-- clear all consumers in the table<br/>
CALL sys.clear_consumers(table => 'default.T')<br/><br/>
-- clear some consumers in the table (accept regular expression)<br/>
CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*')<br/><br/>
-- clear all consumers except excludingConsumers in the table (accept regular expression)<br/>
CALL sys.reset_consumer(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')<br/><br/>
-- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)<br/>
CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')
</td>
</tr>
<tr>
<td>mark_partition_done</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void expire(LocalDateTime expireDateTime) {
}

/** Clear consumers. */
public void clearConsumers(List<String> consumerIds, Boolean clearUnspecified) {
public void clearConsumers(Pattern includingPattern, Pattern excludingPattern) {
try {
listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX)
.forEach(
Expand All @@ -117,10 +118,16 @@ public void clearConsumers(List<String> consumerIds, Boolean clearUnspecified) {
status.getPath()
.getName()
.substring(CONSUMER_PREFIX.length());
if (consumerIds == null
|| (!clearUnspecified && consumerIds.contains(consumerName))
|| (clearUnspecified
&& !consumerIds.contains(consumerName))) {
boolean shouldCompaction =
includingPattern.matcher(consumerName).matches();
if (excludingPattern != null) {
shouldCompaction =
shouldCompaction
&& !excludingPattern
.matcher(consumerName)
.matches();
}
if (shouldCompaction) {
fileIO.deleteQuietly(status.getPath());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,22 @@

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

/**
* Clear consumers procedure. Usage:
*
* <pre><code>
* -- clear all consumers except the specified consumer in the table
* CALL sys.clear_consumers('tableId', 'consumerIds', true)
*
* -- clear all specified consumers in the table
* CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
* -- NOTE: use '' as placeholder for optional arguments
*
* -- clear all consumers in the table
* CALL sys.clear_consumers('tableId')
*
* -- clear some consumers in the table (accept regular expression)
* CALL sys.clear_consumers('tableId', 'includingConsumers')
*
* -- exclude some consumers (accept regular expression)
* CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers')
* </code></pre>
*/
public class ClearConsumersProcedure extends ProcedureBase {
Expand All @@ -51,8 +50,8 @@ public class ClearConsumersProcedure extends ProcedureBase {
public String[] call(
ProcedureContext procedureContext,
String tableId,
String consumerIds,
Boolean clearUnspecified)
String includingConsumers,
String excludingConsumers)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
Expand All @@ -61,33 +60,29 @@ public String[] call(
fileStoreTable.fileIO(),
fileStoreTable.location(),
fileStoreTable.snapshotManager().branch());
List<String> specifiedConsumerIds =
Optional.of(consumerIds)
.map(s -> Arrays.asList(s.split(",")))
.orElse(Collections.emptyList());
consumerManager.clearConsumers(
specifiedConsumerIds, Optional.of(clearUnspecified).orElse(false));

includingConsumers = nullable(includingConsumers);
excludingConsumers = nullable(excludingConsumers);
Pattern includingPattern =
includingConsumers == null
? Pattern.compile(".*")
: Pattern.compile(includingConsumers);
Pattern excludingPattern =
excludingConsumers == null ? null : Pattern.compile(excludingConsumers);
consumerManager.clearConsumers(includingPattern, excludingPattern);

return new String[] {"Success"};
}

public String[] call(ProcedureContext procedureContext, String tableId, String consumerIds)
public String[] call(
ProcedureContext procedureContext, String tableId, String includingConsumers)
throws Catalog.TableNotExistException {
return call(procedureContext, tableId, consumerIds, false);
return call(procedureContext, tableId, includingConsumers, null);
}

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
ConsumerManager consumerManager =
new ConsumerManager(
fileStoreTable.fileIO(),
fileStoreTable.location(),
fileStoreTable.snapshotManager().branch());
consumerManager.clearConsumers(null, null);

return new String[] {"Success"};
return call(procedureContext, tableId, null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.regex.Pattern;

/** Clear consumers action for Flink. */
public class ClearConsumerAction extends TableActionBase {

private String includingConsumers;
private String excludingConsumers;

protected ClearConsumerAction(
String databaseName, String tableName, Map<String, String> catalogConfig) {
super(databaseName, tableName, catalogConfig);
}

public ClearConsumerAction withIncludingConsumers(@Nullable String includingConsumers) {
this.includingConsumers = includingConsumers;
return this;
}

public ClearConsumerAction withExcludingConsumers(@Nullable String excludingConsumers) {
this.excludingConsumers = excludingConsumers;
return this;
}

@Override
public void run() throws Exception {
FileStoreTable dataTable = (FileStoreTable) table;
ConsumerManager consumerManager =
new ConsumerManager(
dataTable.fileIO(),
dataTable.location(),
dataTable.snapshotManager().branch());

includingConsumers =
StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? null : includingConsumers;
excludingConsumers =
StringUtils.isNullOrWhitespaceOnly(excludingConsumers) ? null : excludingConsumers;
Pattern includingPattern =
includingConsumers == null
? Pattern.compile(".*")
: Pattern.compile(includingConsumers);
Pattern excludingPattern =
excludingConsumers == null ? null : Pattern.compile(excludingConsumers);
consumerManager.clearConsumers(includingPattern, excludingPattern);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Optional;

/** Factory to create {@link ClearConsumerAction}. */
public class ClearConsumerActionFactory implements ActionFactory {

public static final String IDENTIFIER = "clear_consumers";

private static final String INCLUDING_CONSUMERS = "including_consumers";
private static final String EXCLUDING_CONSUMERS = "excluding_consumers";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
ClearConsumerAction action =
new ClearConsumerAction(
params.getRequired(DATABASE),
params.getRequired(TABLE),
catalogConfigMap(params));

if (params.has(INCLUDING_CONSUMERS)) {
action.withIncludingConsumers(params.get(INCLUDING_CONSUMERS));
}

if (params.has(EXCLUDING_CONSUMERS)) {
action.withExcludingConsumers(params.get(EXCLUDING_CONSUMERS));
}

return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println(
"Action \"clear_consumers\" clear consumers with including consumers and excluding consumers.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" clear_consumers --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> [--including_consumers <including_pattern> --excluding_consumers <excluding_pattern>]");

System.out.println();
System.out.println("Note:");
System.out.println(
" use '' as placeholder for including_consumers if you want to clear all consumers except excludingConsumers in the table.");
System.out.println();
}
}
Loading

0 comments on commit 727ba2c

Please sign in to comment.