Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32344][connectors/mongodb] Support unbounded streaming read via change stream feature. #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Jiabao-Sun
Copy link
Contributor

@Jiabao-Sun Jiabao-Sun commented Jun 22, 2023

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Change streams are available for replica sets and sharded clusters.

We can use MongoDB change streams feature to support unbounded streaming read for mongodb connector.

SET execution.checkpointing.interval = 3s;

CREATE TABLE orders (
    `_id` STRING,
    `code` STRING,
    `quantity` BIGINT,
    PRIMARY KEY (_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb',
    'uri' = 'mongodb://mongodb:27017',
    'database' = 'test_unbounded',
    'collection' = 'orders',
    -- read collection's snapshot data and then continuously read changed data.
    'scan.startup.mode' = 'initial' 
);

SELETE * FROM orders;

Startup Mode

We can determine whether the source runs in bounded or unbounded mode by setting the scan.startup.mode configuration.

  • bounded: bounded read collection snapshot data and do not read changed data.
  • initial: read collection snapshot data and then continuously read changed data.
  • latest-offset: continuously read changed data from latest offset of oplog.
  • timestamp: continuously read changed data from specified timestamp offset of oplog.

Changelog Mode

UPSERT

Before mongodb version 6.0, pre and post images were not saved in the oplog.
This means that we cannot directly obtain complete pre and post changed record to generate ALL mode changelog.
By default, change streams only return the delta of fields during the update operation. However, we can configure the change stream to return the most current majority-committed version of the updated document to generate UPSERT mode changelog by update lookup feature.

Before mongodb 6.0, we can set the 'change-stream.full-document.strategy' as 'update-lookup' , which is also the default.

'change-stream.full-document.strategy' = 'update-lookup' 

However, update lookup will bring additional query time overhead. And the changelog in UPSERT mode will have an additional changelog normalize operator, which will continuously increase the state of the task. So we have to consider using an external state store to reduce memory pressure when using it, such as rockdb backend.


ALL

Starting in MongoDB 6.0, we can can use change stream events pre and post images feature to output the version of a document before and after changes to generate ALL mode changelog.

We can enable pre and post images of a collection by create or collMod commands:

db.createCollection(
   "orders", {
    changeStreamPreAndPostImages: {
        enabled: true
    }
});

db.runCommand( {
    collMod: "orders",
    changeStreamPreAndPostImages: { enabled: true }
} )

Then we can set the 'change-stream.full-document.strategy' as 'pre-and-post-images' to generate ALL mode changelog.

'change-stream.full-document.strategy' = 'pre-and-post-images' 

@Jiabao-Sun Jiabao-Sun force-pushed the FLINK-32344 branch 2 times, most recently from 297a1ca to 81d779b Compare June 28, 2023 08:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant