@@ -132,7 +132,8 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
132
132
ignored_events = None , auto_position = None ,
133
133
only_tables = None , only_schemas = None ,
134
134
freeze_schema = False , skip_to_timestamp = None ,
135
- report_slave = None , slave_uuid = None ):
135
+ report_slave = None , slave_uuid = None ,
136
+ pymysql_wrapper = None ):
136
137
"""
137
138
Attributes:
138
139
resume_stream: Start for event from position or the latest event of
@@ -183,6 +184,11 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
183
184
self .report_slave = ReportSlave (report_slave )
184
185
self .slave_uuid = slave_uuid
185
186
187
+ if pymysql_wrapper :
188
+ self .pymysql_wrapper = pymysql_wrapper
189
+ else :
190
+ self .pymysql_wrapper = pymysql .connect
191
+
186
192
def close (self ):
187
193
if self .__connected_stream :
188
194
self ._stream_connection .close ()
@@ -198,7 +204,7 @@ def __connect_to_ctl(self):
198
204
self ._ctl_connection_settings = dict (self .__connection_settings )
199
205
self ._ctl_connection_settings ["db" ] = "information_schema"
200
206
self ._ctl_connection_settings ["cursorclass" ] = DictCursor
201
- self ._ctl_connection = pymysql . connect (** self ._ctl_connection_settings )
207
+ self ._ctl_connection = self . pymysql_wrapper (** self ._ctl_connection_settings )
202
208
self ._ctl_connection ._get_table_information = self .__get_table_information
203
209
self .__connected_ctl = True
204
210
@@ -236,7 +242,7 @@ def __connect_to_stream(self):
236
242
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
237
243
# server_id (4) -- server id of this slave
238
244
# log_file (string.EOF) -- filename of the binlog on the master
239
- self ._stream_connection = pymysql . connect (** self .__connection_settings )
245
+ self ._stream_connection = self . pymysql_wrapper (** self .__connection_settings )
240
246
241
247
self .__use_checksum = self .__checksum_enabled ()
242
248
0 commit comments