Skip to content

Commit 66da080

Browse files
committed
add unit test for estimating file size
1 parent 17b2011 commit 66da080

File tree

1 file changed

+274
-0
lines changed

1 file changed

+274
-0
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
package model
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand/v2"
7+
"os"
8+
"testing"
9+
"time"
10+
11+
"github.com/hamba/avro/v2/ocf"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
16+
"github.com/PeerDB-io/peerdb/flow/internal"
17+
"github.com/PeerDB-io/peerdb/flow/shared/types"
18+
)
19+
20+
// This test verifies that avro size computation is sufficiently accurate for MongoDB documents
21+
// by comparing them against actual uncompressed Avro file sizes. MongoDB CDC connector
22+
// produces records with _id (string) and doc (JSON). We will need additional test cases
23+
// for broader schema coverage when we are introducing uncompressed bytes for other databases.
24+
func TestMongoDBAvroSizeComputation(t *testing.T) {
25+
testCases := []struct {
26+
name string
27+
numRecords int
28+
}{
29+
{
30+
name: "small_file",
31+
numRecords: 5_000,
32+
},
33+
{
34+
name: "medium_file",
35+
numRecords: 50_000,
36+
},
37+
{
38+
name: "large_file",
39+
numRecords: 500_000,
40+
},
41+
}
42+
43+
for _, tc := range testCases {
44+
t.Run(tc.name, func(t *testing.T) {
45+
schema := types.QRecordSchema{
46+
Fields: []types.QField{
47+
{Name: "_id", Type: types.QValueKindString, Nullable: false},
48+
{Name: "doc", Type: types.QValueKindJSON, Nullable: false},
49+
},
50+
}
51+
tmpFile := fmt.Sprintf("/tmp/test_avro_size_%s_%d.avro", tc.name, time.Now().Unix())
52+
defer func() {
53+
if err := os.Remove(tmpFile); err != nil && !os.IsNotExist(err) {
54+
t.Logf("Warning: failed to remove temp file %s: %v", tmpFile, err)
55+
}
56+
}()
57+
sizeTracker := &QRecordAvroChunkSizeTracker{TrackUncompressed: true}
58+
avroSchema := writeAvroFileCompressed(t, schema, tc.numRecords, tmpFile, sizeTracker)
59+
computedSize := sizeTracker.Bytes.Load()
60+
61+
t.Logf("Total computed size for %d records: %d bytes (%.2f MiB)",
62+
tc.numRecords, computedSize, float64(computedSize)/(1024*1024))
63+
64+
actualSize := getActualUncompressedSize(t, tmpFile, avroSchema, tc.numRecords)
65+
t.Logf("Actual uncompressed Avro size for %d records: %d bytes (%.2f MiB)",
66+
tc.numRecords, actualSize, float64(actualSize)/(1024*1024))
67+
68+
diff := actualSize - computedSize
69+
diffPercent := float64(diff) / float64(actualSize) * 100
70+
t.Logf("Difference: %d bytes (%.2f%%)", diff, diffPercent)
71+
72+
lowerBound := float64(actualSize) * 0.9
73+
upperBound := float64(actualSize) * 1.1
74+
assert.GreaterOrEqual(t, float64(computedSize), lowerBound,
75+
"Estimated size should be within 10% of actual (lower bound)")
76+
assert.LessOrEqual(t, float64(computedSize), upperBound,
77+
"Estimated size should be within 10% of actual (upper bound)")
78+
79+
// for completion, print out the compression ratio
80+
fileInfo, err := os.Stat(tmpFile)
81+
require.NoError(t, err)
82+
compressedSize := fileInfo.Size()
83+
compressionRatio := float64(actualSize) / float64(compressedSize)
84+
t.Logf("Compressed file size: %d bytes (%.2f MiB), compression ratio: %.2fx",
85+
compressedSize, float64(compressedSize)/(1024*1024), compressionRatio)
86+
})
87+
}
88+
}
89+
90+
// writeAvroFileCompressed writes an Avro file with ZStandard compression
91+
// and uses the sizeTracker to track estimated uncompressed bytes
92+
func writeAvroFileCompressed(
93+
t *testing.T,
94+
schema types.QRecordSchema,
95+
numRecords int,
96+
filePath string,
97+
sizeTracker *QRecordAvroChunkSizeTracker,
98+
) *QRecordAvroSchemaDefinition {
99+
t.Helper()
100+
101+
env := map[string]string{
102+
"PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING": "false", // avoids db lookups
103+
}
104+
105+
avroSchema, err := GetAvroSchemaDefinition(
106+
context.Background(),
107+
env,
108+
"mongo_avro_size_dst_table",
109+
schema,
110+
protos.DBType_CLICKHOUSE,
111+
ConstructColumnNameAvroFieldMap(schema.Fields),
112+
)
113+
require.NoError(t, err)
114+
file, err := os.Create(filePath)
115+
require.NoError(t, err)
116+
defer file.Close()
117+
118+
encoder, err := ocf.NewEncoderWithSchema(
119+
avroSchema.Schema,
120+
file,
121+
ocf.WithCodec(ocf.ZStandard),
122+
ocf.WithBlockLength(8192),
123+
ocf.WithBlockSize(1<<26),
124+
)
125+
require.NoError(t, err)
126+
defer encoder.Close()
127+
128+
avroFieldNames := make([]string, len(avroSchema.Schema.Fields()))
129+
for i, field := range avroSchema.Schema.Fields() {
130+
avroFieldNames[i] = field.Name()
131+
}
132+
133+
avroConverter, err := NewQRecordAvroConverter(
134+
context.Background(),
135+
env,
136+
avroSchema,
137+
protos.DBType_CLICKHOUSE,
138+
avroFieldNames,
139+
nil,
140+
)
141+
require.NoError(t, err)
142+
143+
docContentTemplate := `{
144+
"user_id":"%s",
145+
"email":"%s",
146+
"age":%d,
147+
"is_active":%t,
148+
"is_verified":%t,
149+
"count_logins":%d,
150+
"created_at":"%s",
151+
"updated_at":"%s",
152+
"session_id":"%s",
153+
"ip_address":"%s",
154+
"tags":["%s","%s","%s"],
155+
"preferences":{"theme":"%s","language":"%s"},
156+
"notes":"%s"
157+
}`
158+
159+
//nolint:gosec // tests, not security-sensitive
160+
for range numRecords {
161+
randomizedDocContent := fmt.Sprintf(docContentTemplate,
162+
fmt.Sprintf("usr_%x", rand.Int64()),
163+
fmt.Sprintf("user%[email protected]", rand.IntN(100000)),
164+
rand.IntN(80)+18,
165+
rand.IntN(2) == 1,
166+
rand.IntN(2) == 1,
167+
rand.IntN(10000),
168+
time.Now(),
169+
time.Now(),
170+
fmt.Sprintf("sess_%x", rand.Int64()),
171+
fmt.Sprintf("192.168.%d.%d", rand.IntN(256), rand.IntN(256)),
172+
randomChoice([]string{"premium", "basic", "trial", "enterprise"}),
173+
randomChoice([]string{"web", "mobile", "api"}),
174+
randomChoice([]string{"google", "facebook", "twitter", "direct"}),
175+
randomChoice([]string{"dark", "light", "auto"}),
176+
randomChoice([]string{"en", "es", "fr", "de", "ja"}),
177+
randomText(100),
178+
)
179+
180+
record := []types.QValue{
181+
types.QValueString{Val: fmt.Sprintf("%x", rand.Int64())},
182+
types.QValueJSON{Val: randomizedDocContent},
183+
}
184+
185+
avroMap, err := avroConverter.Convert(
186+
context.Background(),
187+
env,
188+
record,
189+
nil,
190+
nil,
191+
internal.BinaryFormatRaw,
192+
)
193+
require.NoError(t, err)
194+
err = encoder.Encode(avroMap)
195+
require.NoError(t, err)
196+
if sizeTracker != nil && sizeTracker.TrackUncompressed {
197+
sizeTracker.Bytes.Add(avroConverter.ComputeSize(record))
198+
}
199+
}
200+
201+
err = encoder.Flush()
202+
require.NoError(t, err)
203+
err = encoder.Close()
204+
require.NoError(t, err)
205+
err = file.Close()
206+
require.NoError(t, err)
207+
208+
return avroSchema
209+
}
210+
211+
type MeteredWriter struct {
212+
bytesWritten int64
213+
}
214+
215+
func (w *MeteredWriter) Write(p []byte) (int, error) {
216+
w.bytesWritten += int64(len(p))
217+
return len(p), nil
218+
}
219+
220+
func randomChoice(choices []string) string {
221+
//nolint:gosec // tests, not security-sensitive
222+
return choices[rand.IntN(len(choices))]
223+
}
224+
225+
func randomText(length int) string {
226+
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "
227+
result := make([]byte, length)
228+
for i := range result {
229+
//nolint:gosec // tests, not security-sensitive
230+
result[i] = chars[rand.IntN(len(chars))]
231+
}
232+
return string(result)
233+
}
234+
235+
// getActualUncompressedSize reads a compressed Avro file and measures the actual
236+
// uncompressed data size by re-encoding without compression to a metered writer
237+
func getActualUncompressedSize(t *testing.T, filePath string, avroSchema *QRecordAvroSchemaDefinition, numRecords int) int64 {
238+
t.Helper()
239+
240+
file, err := os.Open(filePath)
241+
require.NoError(t, err)
242+
defer file.Close()
243+
decoder, err := ocf.NewDecoder(file)
244+
require.NoError(t, err)
245+
246+
meteredWriter := &MeteredWriter{}
247+
encoder, err := ocf.NewEncoderWithSchema(
248+
avroSchema.Schema,
249+
meteredWriter,
250+
ocf.WithCodec(ocf.Null),
251+
ocf.WithBlockLength(8192),
252+
ocf.WithBlockSize(1<<26),
253+
)
254+
require.NoError(t, err)
255+
defer encoder.Close()
256+
defer encoder.Close()
257+
258+
n := 0
259+
for decoder.HasNext() {
260+
var record map[string]interface{}
261+
err := decoder.Decode(&record)
262+
require.NoError(t, err)
263+
err = encoder.Encode(record)
264+
require.NoError(t, err)
265+
n += 1
266+
}
267+
require.NoError(t, decoder.Error())
268+
assert.Equal(t, numRecords, n)
269+
270+
err = encoder.Flush()
271+
require.NoError(t, err)
272+
273+
return meteredWriter.bytesWritten
274+
}

0 commit comments

Comments
 (0)