-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. #3704
Conversation
…ol whether assign evenly splits in reverse order to avoid OOM exception.
public static final ConfigOption<Boolean> SCAN_FLATTEN_NESTED_COLUMNS_ENABLED = | ||
ConfigOptions.key("scan.flatten-nested-columns.enabled") | ||
.booleanType() | ||
.defaultValue(false) | ||
.withDescription( | ||
"Optional flag to recursively flatten the Bson field into columns." | ||
+ "For a better understanding, the name of the flattened column will be composed of the path to get the column. " | ||
+ "For example, the field `col` in the Bson document {\"nested\": {\"col\": true}} is `nested.col` in the flattened schema. "); | ||
|
||
public static final ConfigOption<Boolean> SCAN_PRIMITIVE_AS_STRING = | ||
ConfigOptions.key("scan.primitive-as-string") | ||
.booleanType() | ||
.defaultValue(false) | ||
.withDescription("Optional flag to infer primitive types as string type."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Irrelevant change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
…ol whether assign evenly splits in reverse order to avoid OOM exception.
} | ||
return SplitVectorSplitStrategy.INSTANCE.split(splitContext); | ||
snapshotSplits.addAll(SplitVectorSplitStrategy.INSTANCE.split(splitContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it will split repeatly here.
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; | ||
|
||
/** Strategy for {@link MongoDBChunkSplitter} to assign {@link SnapshotSplit}. */ | ||
public enum AssignStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this class to flink-cdc-base module, and reuse it in mysql/mongo source module?
@@ -125,6 +129,9 @@ public class MongoDBTableFactoryTest { | |||
private static final boolean SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT = | |||
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(); | |||
|
|||
private static final AssignStrategy SCAN_CHUNK_ASSIGN_STRATEGY_DEFAULT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add test in MongoDBConnectorITCase for DESCENDING_ORDER AssignStrategy? This can check the result correctness for the new assign strategy.
@lvyanquan Thanks for your contribution. I left some comments here. |
kindly ping @lvyanquan |
Closed as there is a better solution in #3856. |
Introduce a new config option scan.chunk.assign.strategy to control the order of split assignment.