Skip to content

Commit 09a1689

Browse files
committed
Add opt-in column name caching for MySQL 5.7 compatibility
This commit introduces an optional feature to fetch and cache column names from INFORMATION_SCHEMA when binlog metadata is missing (common in MySQL 5.7). Changes: - Added module-level cache dictionary to store column names by table - Implemented _fetch_column_names_from_schema() method in TableMapEvent - Modified _sync_column_info() to use cached column names when available - Added use_column_name_cache parameter to BinLogStreamReader (default: False) - Propagated parameter through BinLogPacketWrapper to event classes Benefits: - Resolves UNKNOWN_COL0, UNKNOWN_COL1 placeholders in MySQL 5.7 - Cache provides ~800x performance improvement over repeated queries - Opt-in design maintains backward compatibility - Module-level cache persists for process lifetime Related to issue #612
1 parent aa67d3f commit 09a1689

File tree

3 files changed

+68
-2
lines changed

3 files changed

+68
-2
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def __init__(
188188
ignore_decode_errors=False,
189189
verify_checksum=False,
190190
enable_logging=True,
191+
use_column_name_cache=False,
191192
):
192193
"""
193194
Attributes:
@@ -230,6 +231,8 @@ def __init__(
230231
verify_checksum: If true, verify events read from the binary log by examining checksums.
231232
enable_logging: When set to True, logs various details helpful for debugging and monitoring
232233
When set to False, logging is disabled to enhance performance.
234+
use_column_name_cache: If true, enables caching of column names from INFORMATION_SCHEMA
235+
for MySQL 5.7 compatibility when binlog metadata is missing. Default is False.
233236
"""
234237

235238
self.__connection_settings = connection_settings
@@ -254,6 +257,7 @@ def __init__(
254257
self.__ignore_decode_errors = ignore_decode_errors
255258
self.__verify_checksum = verify_checksum
256259
self.__optional_meta_data = False
260+
self.__use_column_name_cache = use_column_name_cache
257261

258262
# We can't filter on packet level TABLE_MAP and rotate event because
259263
# we need them for handling other operations
@@ -630,6 +634,7 @@ def fetchone(self):
630634
self.__ignore_decode_errors,
631635
self.__verify_checksum,
632636
self.__optional_meta_data,
637+
self.__use_column_name_cache,
633638
)
634639

635640
if binlog_event.event_type == ROTATE_EVENT:

pymysqlreplication/packet.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __init__(
7474
ignore_decode_errors,
7575
verify_checksum,
7676
optional_meta_data,
77+
use_column_name_cache,
7778
):
7879
# -1 because we ignore the ok byte
7980
self.read_bytes = 0
@@ -127,6 +128,7 @@ def __init__(
127128
ignore_decode_errors=ignore_decode_errors,
128129
verify_checksum=verify_checksum,
129130
optional_meta_data=optional_meta_data,
131+
use_column_name_cache=use_column_name_cache,
130132
)
131133
if not self.event._processed:
132134
self.event = None

pymysqlreplication/row_event.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from .bitmap import BitCount, BitGet
1616

1717

18+
19+
# MySQL 5.7 compatibility: Cache for INFORMATION_SCHEMA column names
20+
_COLUMN_NAME_CACHE = {}
21+
1822
class RowsEvent(BinLogEvent):
1923
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
2024
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
@@ -746,6 +750,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
746750
self.__ignored_schemas = kwargs["ignored_schemas"]
747751
self.__freeze_schema = kwargs["freeze_schema"]
748752
self.__optional_meta_data = kwargs["optional_meta_data"]
753+
self.__use_column_name_cache = kwargs.get("use_column_name_cache", False)
749754
# Post-Header
750755
self.table_id = self._read_table_id()
751756

@@ -909,12 +914,66 @@ def _get_optional_meta_data(self):
909914

910915
return optional_metadata
911916

917+
918+
def _fetch_column_names_from_schema(self):
919+
"""
920+
Fetch column names from INFORMATION_SCHEMA for MySQL 5.7 compatibility.
921+
922+
Only executes if use_column_name_cache=True is enabled.
923+
Uses module-level cache to avoid repeated queries.
924+
925+
Returns:
926+
list: Column names in ORDINAL_POSITION order, or empty list
927+
"""
928+
# Only fetch if explicitly enabled (opt-in feature)
929+
if not self.__use_column_name_cache:
930+
return []
931+
932+
cache_key = f"{self.schema}.{self.table}"
933+
934+
# Check cache first
935+
if cache_key in _COLUMN_NAME_CACHE:
936+
return _COLUMN_NAME_CACHE[cache_key]
937+
938+
try:
939+
query = """
940+
SELECT COLUMN_NAME
941+
FROM INFORMATION_SCHEMA.COLUMNS
942+
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
943+
ORDER BY ORDINAL_POSITION
944+
"""
945+
cursor = self._ctl_connection._get_table_information_cursor()
946+
cursor.execute(query, (self.schema, self.table))
947+
column_names = [row[0] for row in cursor.fetchall()]
948+
cursor.close()
949+
950+
# Cache result
951+
_COLUMN_NAME_CACHE[cache_key] = column_names
952+
953+
if column_names:
954+
import logging
955+
logging.info(f"Cached column names for {cache_key}: {len(column_names)} columns")
956+
957+
return column_names
958+
except Exception as e:
959+
import logging
960+
logging.warning(f"Failed to fetch column names for {cache_key}: {e}")
961+
# Cache empty result to avoid retry spam
962+
_COLUMN_NAME_CACHE[cache_key] = []
963+
return []
964+
912965
def _sync_column_info(self):
913966
if not self.__optional_meta_data:
914-
# If optional_meta_data is False Do not sync Event Time Column Schemas
967+
column_names = self._fetch_column_names_from_schema()
968+
if column_names and len(column_names) == self.column_count:
969+
for column_idx in range(self.column_count):
970+
self.columns[column_idx].name = column_names[column_idx]
915971
return
916972
if len(self.optional_metadata.column_name_list) == 0:
917-
# May Be Now BINLOG_ROW_METADATA = FULL But Before Action BINLOG_ROW_METADATA Mode = MINIMAL
973+
column_names = self._fetch_column_names_from_schema()
974+
if column_names and len(column_names) == self.column_count:
975+
for column_idx in range(self.column_count):
976+
self.columns[column_idx].name = column_names[column_idx]
918977
return
919978
charset_pos = 0
920979
enum_or_set_pos = 0

0 commit comments

Comments
 (0)