-
Notifications
You must be signed in to change notification settings - Fork 162
track uncompressed bytes for mongodb connector #3715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Question, have we explored providing the record size from the pull side along with the record? Basically the only avro schemas Mongo data can have are the raw table schema and the (_id String, doc String) one which should be easy to size, and the generic solution that supports geometry types and all types of integers wouldn't be needed. But that doesn't account for flattened mode, which could end up being easier to support the generic way (or not). |
|
@ilidemi great question/observation. agree it would have been a simpler implementation for Mongo's case, but as you have highlighted well, it does not extend to flatten mode as easily. (I did consider adding a Benefits of current approach: (1) it can be extended to other data sources easily and (2) it can leads to better reliability for around OOM issues in general; we haven't seem as many OOM issues with other connectors, but it could also be because of our conservative default value. With tracking uncompressed bytes as the default approach for connectors, we would be able to set a more reasonable default given available memory on an instance. |
ce1de97 to
4e0fe48
Compare
❌ 12 Tests Failed:
View the top 3 failed test(s) by shortest run time
View the full list of 2 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
66da080 to
9719705
Compare
9719705 to
668aef2
Compare
| arrTerminationByteSize := int64(1) | ||
|
|
||
| size := int64(0) | ||
| switch v := value.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a little bad that the switch branches are duplicated when the size is tied to the value. How about:
func QValueToAvro(..., calcSize bool) (any, int64, error) {
switch v := value.(type) {
case types.QValueFloat64:
return c.processNullableUnion(float64(v.Val)), constSize(8, calcSize), nil
case types.QValueString:
return c.processNullableUnion(v.Value()), stringSize(v.Value(), calcSize), nil
...
}
func constSize(n int64, calcSize bool) int64 {
if !calcSize {
return 0
}
return n
}
func stringSize(s string, calcSize bool) int64 {
if !calcSize {
return 0
}
return calcVarIntSize(int64(len(s))) + int64(len(s))
}| {Name: "doc", Type: types.QValueKindJSON, Nullable: false}, | ||
| }, | ||
| } | ||
| tmpFile := fmt.Sprintf("/tmp/test_avro_size_%s_%d.avro", tc.name, time.Now().Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestWriteRecordsToAvroFileHappyPath has a pattern for temp files
| t.Run(tc.name, func(t *testing.T) { | ||
| schema := types.QRecordSchema{ | ||
| Fields: []types.QField{ | ||
| {Name: "_id", Type: types.QValueKindString, Nullable: false}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CDC path/raw table? 👉 👈
(ok if not)
MongoDB Avro file has a very high compression ratio. This can result in OOM issues during file upload to ClickHouse during initial snapshot. The proposed solution compute the uncompressed bytes of QRecords based on Avro encoding. It reuses the
PEERDB_S3_BYTES_PER_AVRO_FILEvariable to determine limit, and then based on connector source type, determines whether to compute the compressed bytes (which is conveniently computed by the WatchWriter and how we've tracked this historically) or uncompressed bytes (in the case for MongoDB Connector, where we manually compute bytes as we process QValues). If we want a different default, we could also introduce a new env variable instead, e.g.PEERDB_S3_UNCOMPRESSED_BYTES_PER_AVRO_FILE. Keeping it to initial snapshot for now since that's where OOM issue is happening. If we want to extend this to CDC in the future, the computation would also be different due to the schema of the staging table.Add unit tests
Run manual smoke test