Skip to content

Commit 0476932

Browse files
authored
feat: add file store commit and manifest merger support (#95)
1 parent ec06b4f commit 0476932

12 files changed

Lines changed: 2629 additions & 0 deletions

include/paimon/commit_context.h

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <map>
23+
#include <memory>
24+
#include <string>
25+
26+
#include "paimon/result.h"
27+
#include "paimon/type_fwd.h"
28+
#include "paimon/visibility.h"
29+
30+
namespace paimon {
31+
class Executor;
32+
class MemoryPool;
33+
34+
/// `CommitContext` is some configuration for commit operations.
35+
///
36+
/// Please do not use this class directly, use `CommitContextBuilder` to build a `CommitContext`
37+
/// which has input validation.
38+
/// @see CommitContextBuilder
39+
class PAIMON_EXPORT CommitContext {
40+
public:
41+
CommitContext(const std::string& root_path, const std::string& commit_user,
42+
bool ignore_empty_commit, bool use_rest_catalog_commit,
43+
const std::shared_ptr<MemoryPool>& memory_pool,
44+
const std::shared_ptr<Executor>& executor,
45+
const std::shared_ptr<FileSystem>& specific_file_system,
46+
const std::map<std::string, std::string>& options);
47+
~CommitContext();
48+
49+
const std::string& GetRootPath() const {
50+
return root_path_;
51+
}
52+
53+
const std::string& GetCommitUser() const {
54+
return commit_user_;
55+
}
56+
57+
bool IgnoreEmptyCommit() const {
58+
return ignore_empty_commit_;
59+
}
60+
61+
bool UseRESTCatalogCommit() const {
62+
return use_rest_catalog_commit_;
63+
}
64+
65+
std::shared_ptr<MemoryPool> GetMemoryPool() const {
66+
return memory_pool_;
67+
}
68+
69+
std::shared_ptr<Executor> GetExecutor() const {
70+
return executor_;
71+
}
72+
73+
std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
74+
return specific_file_system_;
75+
}
76+
77+
const std::map<std::string, std::string>& GetOptions() const {
78+
return options_;
79+
}
80+
81+
private:
82+
std::string root_path_;
83+
std::string commit_user_;
84+
bool ignore_empty_commit_;
85+
bool use_rest_catalog_commit_;
86+
std::shared_ptr<MemoryPool> memory_pool_;
87+
std::shared_ptr<Executor> executor_;
88+
std::shared_ptr<FileSystem> specific_file_system_;
89+
std::map<std::string, std::string> options_;
90+
};
91+
92+
/// `CommitContextBuilder` used to build a `CommitContext`, has input validation.
93+
class PAIMON_EXPORT CommitContextBuilder {
94+
public:
95+
/// Constructs a `CommitContextBuilder` with required parameters.
96+
/// @param root_path The root path of the Paimon table.
97+
/// @param commit_user The user identifier for the commit operation.
98+
CommitContextBuilder(const std::string& root_path, const std::string& commit_user);
99+
100+
~CommitContextBuilder();
101+
102+
/// Set a configuration options map to set some option entries which are not defined in the
103+
/// table schema or whose values you want to overwrite.
104+
/// @note The options map will clear the options added by `AddOption()` before.
105+
/// @param options The configuration options map.
106+
/// @return Reference to this builder for method chaining.
107+
CommitContextBuilder& SetOptions(const std::map<std::string, std::string>& options);
108+
109+
/// Add a single configuration option which is not defined in the table schema or whose value
110+
/// you want to overwrite.
111+
///
112+
/// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()`
113+
/// instead.
114+
/// @param key The option key.
115+
/// @param value The option value.
116+
/// @return Reference to this builder for method chaining.
117+
CommitContextBuilder& AddOption(const std::string& key, const std::string& value);
118+
119+
/// Sets whether to ignore empty commits (default is true).
120+
/// When set to true, commits that don't contain any actual data changes will be ignored.
121+
/// @param ignore_empty_commit True to ignore empty commits, false otherwise.
122+
/// @return Reference to this builder for method chaining.
123+
CommitContextBuilder& IgnoreEmptyCommit(bool ignore_empty_commit);
124+
125+
/// Sets whether to use REST catalog commit (default is false).
126+
/// @note Temporary interface, will be removed in the future.
127+
/// @param use_rest_catalog_commit True to use REST catalog commit, false otherwise.
128+
/// @return Reference to this builder for method chaining.
129+
CommitContextBuilder& UseRESTCatalogCommit(bool use_rest_catalog_commit);
130+
131+
/// Sets the memory pool to be used for memory allocation during commit operations.
132+
/// @param memory_pool Shared pointer to the memory pool instance.
133+
/// @return Reference to this builder for method chaining.
134+
CommitContextBuilder& WithMemoryPool(const std::shared_ptr<MemoryPool>& memory_pool);
135+
136+
/// Sets the executor to be used for asynchronous operations during commit.
137+
/// @param executor Shared pointer to the executor instance.
138+
/// @return Reference to this builder for method chaining.
139+
CommitContextBuilder& WithExecutor(const std::shared_ptr<Executor>& executor);
140+
141+
/// Sets a custom file system instance to be used for all file operations in this commit
142+
/// context.
143+
/// This bypasses the global file system registry and uses the provided implementation directly.
144+
///
145+
/// @param file_system The file system to use.
146+
/// @return Reference to this builder for method chaining.
147+
/// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`)
148+
CommitContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& file_system);
149+
150+
/// Build and return a `CommitContext` instance with input validation.
151+
/// @return Result containing the constructed `CommitContext` or an error status.
152+
Result<std::unique_ptr<CommitContext>> Finish();
153+
154+
private:
155+
class Impl;
156+
157+
std::unique_ptr<Impl> impl_;
158+
};
159+
160+
} // namespace paimon

include/paimon/file_store_commit.h

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <cstdint>
23+
#include <map>
24+
#include <memory>
25+
#include <optional>
26+
#include <string>
27+
#include <vector>
28+
29+
#include "paimon/defs.h"
30+
#include "paimon/executor.h"
31+
#include "paimon/memory/memory_pool.h"
32+
#include "paimon/metrics.h"
33+
#include "paimon/result.h"
34+
#include "paimon/status.h"
35+
#include "paimon/type_fwd.h"
36+
#include "paimon/visibility.h"
37+
38+
namespace paimon {
39+
class CommitContext;
40+
class CommitMessage;
41+
42+
/// Interface for commit operations in a file store.
43+
///
44+
/// The `FileStoreCommit` class provides interfaces for committing changes, expiring old snapshots,
45+
/// dropping partitions, and retrieving commit metrics.
46+
class PAIMON_EXPORT FileStoreCommit {
47+
public:
48+
/// Create an instance of `FileStoreCommit`.
49+
///
50+
/// @param context A unique pointer to the `CommitContext` used for commit operations.
51+
///
52+
/// @return A Result containing a unique pointer to the `FileStoreCommit` instance.
53+
static Result<std::unique_ptr<FileStoreCommit>> Create(std::unique_ptr<CommitContext> context);
54+
55+
virtual ~FileStoreCommit() = default;
56+
57+
/// Commit changes to the file store.
58+
///
59+
/// @param commit_messages A vector of commit messages to be committed.
60+
/// @param commit_identifier An optional identifier for the commit operation. Default is
61+
/// `BATCH_WRITE_COMMIT_IDENTIFIER`.
62+
/// @param watermark An optional event-time watermark used to indicate the progress of data
63+
/// processing. Default is std::nullopt.
64+
/// @return Status indicating the success or failure of the commit operation.
65+
virtual Status Commit(const std::vector<std::shared_ptr<CommitMessage>>& commit_messages,
66+
int64_t commit_identifier = BATCH_WRITE_COMMIT_IDENTIFIER,
67+
std::optional<int64_t> watermark = std::nullopt) = 0;
68+
69+
/// Filter out all `std::vector<CommitMessage>` which have been committed and commit the
70+
/// remaining ones.
71+
///
72+
/// Compared to commit, this method will first check if a commit_identifier has been
73+
/// committed, so this method might be slower. A common usage of this method is to retry the
74+
/// commit process after a failure.
75+
///
76+
/// @param commit_identifier_and_messages A map containing all {@link CommitMessage}s in
77+
/// question. The key is the commit_identifier.
78+
///
79+
/// @param watermark An optional event-time watermark used to indicate the progress of data
80+
/// processing. Default is std::nullopt.
81+
/// @return Number of `std::vector<CommitMessage>` committed.
82+
virtual Result<int32_t> FilterAndCommit(
83+
const std::map<int64_t, std::vector<std::shared_ptr<CommitMessage>>>&
84+
commit_identifier_and_messages,
85+
std::optional<int64_t> watermark = std::nullopt) = 0;
86+
87+
/// Overwrite from manifest committable and partition.
88+
///
89+
/// @param partitions A single partition maps each partition key to a partition value. Depending
90+
/// on the user-defined statement, the partition might not include all partition keys. Also
91+
/// note that this partition does not necessarily equal to the partitions of the newly added
92+
/// key-values. This is just the partition to be cleaned up.
93+
/// @param commit_messages Description of the commit messages.
94+
/// @param commit_identifier Unique identifier.
95+
/// @param watermark An optional event-time watermark used to indicate the progress of data
96+
/// processing. Default is std::nullopt.
97+
/// @return Result of the operation.
98+
virtual Status Overwrite(const std::vector<std::map<std::string, std::string>>& partitions,
99+
const std::vector<std::shared_ptr<CommitMessage>>& commit_messages,
100+
int64_t commit_identifier,
101+
std::optional<int64_t> watermark = std::nullopt) = 0;
102+
103+
/// This is a temporary interface for internal use. It will be removed in a future version.
104+
/// Please do not rely on it for long-term use.
105+
///
106+
/// @param partitions Description of the partitions.
107+
/// @param commit_messages Description of the commit messages.
108+
/// @param commit_identifier Unique identifier.
109+
/// @param watermark An optional event-time watermark used to indicate the progress of data
110+
/// processing. Default is std::nullopt.
111+
/// @return Result of the operation.
112+
virtual Result<int32_t> FilterAndOverwrite(
113+
const std::vector<std::map<std::string, std::string>>& partitions,
114+
const std::vector<std::shared_ptr<CommitMessage>>& commit_messages,
115+
int64_t commit_identifier, std::optional<int64_t> watermark = std::nullopt) = 0;
116+
117+
/// If user want to use REST catalog commit, please set
118+
/// `CommitContextBuilder::UseRESTCatalogCommit()`, then call `Commit()` (or
119+
/// `FilterAndCommit()`) normally, then call this method to get the last commit table request,
120+
/// which is a JSON string that can be used to send to REST catalog server.
121+
///
122+
/// @note Temporary interface for internal use, will be removed in the future.
123+
///
124+
/// @return A Result containing a JSON string which including `snapshot` and `statistics`, but
125+
/// excluding `tableId`.
126+
virtual Result<std::string> GetLastCommitTableRequest() = 0;
127+
128+
/// Expire old snapshot in the file store.
129+
///
130+
/// @return Result<int32_t> indicating the number of expired items or an error status.
131+
virtual Result<int32_t> Expire() = 0;
132+
133+
/// Drop specified partitions from the file store.
134+
///
135+
/// @param partitions A vector of partitions to be dropped.
136+
/// @param commit_identifier An identifier for the commit operation.
137+
/// @return Status indicating the success or failure of the drop partition operation.
138+
virtual Status DropPartition(const std::vector<std::map<std::string, std::string>>& partitions,
139+
int64_t commit_identifier) = 0;
140+
141+
/// Retrieve metrics related to commit operations.
142+
///
143+
/// @return A shared pointer to a `Metrics` object containing commit metrics.
144+
virtual std::shared_ptr<Metrics> GetCommitMetrics() const = 0;
145+
};
146+
147+
} // namespace paimon

0 commit comments

Comments
 (0)