Skip to content

Commit 28921bb

Browse files
feat(jdbc): add hashModForField method for MySQL dialect
1 parent e9f0757 commit 28921bb

File tree

1 file changed

+186
-0
lines changed
  • seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql

1 file changed

+186
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
19+
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.security.MessageDigest;
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.zip.CRC32;
29+
30+
public class MysqlDialectTest {
31+
32+
@Test
33+
public void testHashDistributionMD5vsCRC32WithSnowflakeIds() {
34+
int totalRecords = 1_100_000;
35+
int partitions = 10;
36+
List<String> snowflakeIds = generateSnowflakeIds(totalRecords);
37+
38+
Map<Integer, Integer> md5Distribution = new HashMap<>();
39+
for (int i = 0; i < partitions; i++) {
40+
md5Distribution.put(i, 0);
41+
}
42+
43+
for (String id : snowflakeIds) {
44+
int partition = calculateMD5Partition(id, partitions);
45+
md5Distribution.put(partition, md5Distribution.get(partition) + 1);
46+
}
47+
48+
Map<Integer, Integer> crc32Distribution = new HashMap<>();
49+
for (int i = 0; i < partitions; i++) {
50+
crc32Distribution.put(i, 0);
51+
}
52+
53+
for (String id : snowflakeIds) {
54+
int partition = calculateCRC32Partition(id, partitions);
55+
crc32Distribution.put(partition, crc32Distribution.get(partition) + 1);
56+
}
57+
58+
System.out.println("MD5 Distribution (OLD - Has Issue):");
59+
for (int i = 0; i < partitions; i++) {
60+
int count = md5Distribution.get(i);
61+
double percentage = (count * 100.0) / totalRecords;
62+
System.out.printf(
63+
" Partition %d: %,7d records (%.2f%%)%s\n",
64+
i, count, percentage, (percentage > 20 ? " SKEWED!" : ""));
65+
}
66+
67+
System.out.println("\nCRC32 Distribution (NEW - Fixed):");
68+
for (int i = 0; i < partitions; i++) {
69+
int count = crc32Distribution.get(i);
70+
double percentage = (count * 100.0) / totalRecords;
71+
System.out.printf(
72+
" Partition %d: %,7d records (%.2f%%)%s\n",
73+
i, count, percentage, (percentage > 20 ? " SKEWED!" : " ✓"));
74+
}
75+
76+
// Verify that MD5 is severely skewed
77+
double md5Partition0Percentage = (md5Distribution.get(0) * 100.0) / totalRecords;
78+
Assertions.assertTrue(md5Partition0Percentage > 30);
79+
80+
// Verify that CRC32 is evenly distributed
81+
for (int i = 0; i < partitions; i++) {
82+
double crc32Percentage = (crc32Distribution.get(i) * 100.0) / totalRecords;
83+
Assertions.assertTrue(crc32Percentage >= 7 && crc32Percentage <= 13);
84+
}
85+
86+
double md5StdDev = calculateStandardDeviation(md5Distribution, totalRecords, partitions);
87+
double crc32StdDev =
88+
calculateStandardDeviation(crc32Distribution, totalRecords, partitions);
89+
90+
// The standard deviation of CRC32 should be much smaller than MD5
91+
Assertions.assertTrue(crc32StdDev < md5StdDev / 2);
92+
}
93+
94+
/** Generate Snowflake Algorithm ID */
95+
private List<String> generateSnowflakeIds(int count) {
96+
List<String> ids = new ArrayList<>(count);
97+
long baseTimestamp = 1704067200000L;
98+
long timestampBits = baseTimestamp << 22;
99+
100+
for (int i = 0; i < count; i++) {
101+
long timeIncrement = (i / 4096) << 22;
102+
long machineId = (i % 1024) << 12;
103+
long sequence = i % 4096;
104+
105+
long snowflakeId = timestampBits + timeIncrement + machineId + sequence;
106+
ids.add(String.valueOf(snowflakeId));
107+
}
108+
109+
return ids;
110+
}
111+
112+
/** Simulate the MD5 behavior of MySQL */
113+
private int calculateMD5Partition(String id, int mod) {
114+
try {
115+
MessageDigest md = MessageDigest.getInstance("MD5");
116+
byte[] digest = md.digest(id.getBytes());
117+
118+
StringBuilder hexString = new StringBuilder();
119+
for (byte b : digest) {
120+
String hex = Integer.toHexString(0xff & b);
121+
if (hex.length() == 1) {
122+
hexString.append('0');
123+
}
124+
hexString.append(hex);
125+
}
126+
127+
String hexResult = hexString.toString();
128+
long numericValue = convertHexStringToNumberMySQLWay(hexResult);
129+
130+
return (int) Math.abs(numericValue % mod);
131+
} catch (Exception e) {
132+
throw new RuntimeException(e);
133+
}
134+
}
135+
136+
/**
137+
* Simulate MySQL string to number conversion: Read from left to right and stop when the first
138+
* non numeric character is encountered.
139+
*/
140+
private long convertHexStringToNumberMySQLWay(String hexString) {
141+
if (hexString == null || hexString.isEmpty()) {
142+
return 0;
143+
}
144+
145+
StringBuilder numericPart = new StringBuilder();
146+
for (char c : hexString.toCharArray()) {
147+
if (c >= '0' && c <= '9') {
148+
numericPart.append(c);
149+
} else {
150+
break;
151+
}
152+
}
153+
154+
if (numericPart.length() == 0) {
155+
return 0;
156+
}
157+
158+
try {
159+
return Long.parseLong(numericPart.toString());
160+
} catch (NumberFormatException e) {
161+
return 0;
162+
}
163+
}
164+
165+
/** Simulate CRC32 behavior */
166+
private int calculateCRC32Partition(String id, int mod) {
167+
CRC32 crc32 = new CRC32();
168+
crc32.update(id.getBytes());
169+
long crcValue = crc32.getValue();
170+
171+
return (int) Math.abs(crcValue % mod);
172+
}
173+
174+
private double calculateStandardDeviation(
175+
Map<Integer, Integer> distribution, int totalRecords, int partitions) {
176+
double mean = totalRecords / (double) partitions;
177+
double sumSquaredDiff = 0;
178+
179+
for (int i = 0; i < partitions; i++) {
180+
double diff = distribution.get(i) - mean;
181+
sumSquaredDiff += diff * diff;
182+
}
183+
184+
return Math.sqrt(sumSquaredDiff / partitions);
185+
}
186+
}

0 commit comments

Comments
 (0)