Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit 443e67d

Browse files
authored
Add use_secondary config flag (#145)
1 parent 2b25f69 commit 443e67d

File tree

10 files changed

+258
-162
lines changed

10 files changed

+258
-162
lines changed

.github/workflows/ci.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ jobs:
3535

3636
- name: Tests
3737
env:
38+
TAP_POSTGRES_HOST: localhost
3839
TAP_POSTGRES_PORT: 5432
3940
TAP_POSTGRES_USER: test_user
4041
TAP_POSTGRES_PASSWORD: my-secret-passwd
41-
TAP_POSTGRES_HOST: localhost
42+
TAP_POSTGRES_SECONDARY_HOST: localhost
43+
TAP_POSTGRES_SECONDARY_PORT: 5433
4244
LOGGING_CONF_FILE: ./sample_logging.conf
4345
run: make test

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pylint:
99
pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/
1010

1111
start_db:
12-
docker-compose up -d --build db
12+
docker-compose up -d
1313

1414
test:
1515
. ./venv/bin/activate ;\

README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ Full list of options in `config.json`:
6666
| tap_id | String | No | ID of the pipeline/tap (Default: None) |
6767
| itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) |
6868
| default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) |
69+
| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) |
70+
| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) |
71+
| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) |
6972

7073

7174
### Run the tap in Discovery Mode
@@ -142,7 +145,7 @@ to the tap for the next sync.
142145
```
143146
144147
Restart your PostgreSQL service to ensure the changes take effect.
145-
148+
146149
**Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5.
147150
This should be sufficient unless you have a large number of read replicas connected to the master instance.
148151
@@ -151,11 +154,11 @@ to the tap for the next sync.
151154
In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a
152155
client in the order they were made on the original server. Each slot streams a sequence of changes from a single
153156
database.
154-
157+
155158
Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot:
156159
```
157160
SELECT *
158-
FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
161+
FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
159162
```
160163
161164
**Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple
@@ -172,6 +175,8 @@ to the tap for the next sync.
172175
```
173176
export TAP_POSTGRES_HOST=<postgres-host>
174177
export TAP_POSTGRES_PORT=<postgres-port>
178+
export TAP_POSTGRES_SECONDARY_HOST=<postgres-replica-host>
179+
export TAP_POSTGRES_SECONDARY_PORT=<postgres-replica-port>
175180
export TAP_POSTGRES_USER=<postgres-user>
176181
export TAP_POSTGRES_PASSWORD=<postgres-password>
177182
```

docker-compose.yml

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,31 @@
11
version: "3.3"
22

33
services:
4-
db:
5-
image: "debezium/postgres:12-alpine"
6-
container_name: ""
4+
db_primary:
5+
image: "docker.io/bitnami/postgresql:12"
6+
container_name: "primary"
77
ports:
88
- "5432:5432"
99
environment:
10+
- POSTGRESQL_REPLICATION_MODE=master
11+
- POSTGRESQL_REPLICATION_USER=repl_user
12+
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
1013
- POSTGRES_USER=test_user
1114
- POSTGRES_PASSWORD=my-secret-passwd
12-
- POSTGRES_DB=tap_postgres_test
13-
command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5"
15+
- POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd
16+
- POSTGRESQL_DATABASE=tap_postgres_test
17+
- ALLOW_EMPTY_PASSWORD=yes
18+
db_replica:
19+
image: "docker.io/bitnami/postgresql:12"
20+
container_name: replica
21+
ports:
22+
- "5433:5432"
23+
depends_on:
24+
- db_primary
25+
environment:
26+
- POSTGRESQL_REPLICATION_MODE=slave
27+
- POSTGRESQL_REPLICATION_USER=repl_user
28+
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
29+
- POSTGRESQL_MASTER_HOST=db_primary
30+
- POSTGRESQL_MASTER_PORT_NUMBER=5432
31+
- ALLOW_EMPTY_PASSWORD=yes

tap_postgres/__init__.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,9 +405,22 @@ def main_impl():
405405
'debug_lsn': args.config.get('debug_lsn') == 'true',
406406
'max_run_seconds': args.config.get('max_run_seconds', 43200),
407407
'break_at_end_lsn': args.config.get('break_at_end_lsn', True),
408-
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0))
408+
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
409+
'use_secondary': args.config.get('use_secondary', False),
409410
}
410411

412+
if conn_config['use_secondary']:
413+
try:
414+
conn_config.update({
415+
# Host and Port are mandatory.
416+
'secondary_host': args.config['secondary_host'],
417+
'secondary_port': args.config['secondary_port'],
418+
})
419+
except KeyError as exc:
420+
raise ValueError(
421+
"When 'use_secondary' enabled 'secondary_host' and 'secondary_port' must be defined."
422+
) from exc
423+
411424
if args.config.get('ssl') == 'true':
412425
conn_config['sslmode'] = 'require'
413426

tap_postgres/db.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def fully_qualified_table_name(schema, table):
3636
return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"'
3737

3838

39-
def open_connection(conn_config, logical_replication=False):
39+
def open_connection(conn_config, logical_replication=False, prioritize_primary=False):
4040
cfg = {
4141
'application_name': 'pipelinewise',
4242
'host': conn_config['host'],
@@ -47,6 +47,14 @@ def open_connection(conn_config, logical_replication=False):
4747
'connect_timeout': 30
4848
}
4949

50+
if conn_config['use_secondary'] and not prioritize_primary and not logical_replication:
51+
# Try to use replica but fallback to primary if keys are missing. This is the same behavior as
52+
# https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129
53+
cfg.update({
54+
'host': conn_config.get("secondary_host", conn_config['host']),
55+
'port': conn_config.get("secondary_port", conn_config['port']),
56+
})
57+
5058
if conn_config.get('sslmode'):
5159
cfg['sslmode'] = conn_config['sslmode']
5260

tap_postgres/sync_strategies/logical_replication.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class UnsupportedPayloadKindError(Exception):
3434

3535
# pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments
3636
def get_pg_version(conn_info):
37-
with post_db.open_connection(conn_info, False) as conn:
37+
with post_db.open_connection(conn_info, False, True) as conn:
3838
with conn.cursor() as cur:
3939
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
4040
version = cur.fetchone()[0]
@@ -93,7 +93,7 @@ def fetch_current_lsn(conn_config):
9393
if version < 90400:
9494
raise Exception('Logical replication not supported before PostgreSQL 9.4')
9595

96-
with post_db.open_connection(conn_config, False) as conn:
96+
with post_db.open_connection(conn_config, False, True) as conn:
9797
with conn.cursor() as cur:
9898
# Use version specific lsn command
9999
if version >= 100000:
@@ -138,7 +138,7 @@ def create_hstore_elem_query(elem):
138138

139139

140140
def create_hstore_elem(conn_info, elem):
141-
with post_db.open_connection(conn_info) as conn:
141+
with post_db.open_connection(conn_info, False, True) as conn:
142142
with conn.cursor() as cur:
143143
query = create_hstore_elem_query(elem)
144144
cur.execute(query)
@@ -151,7 +151,7 @@ def create_array_elem(elem, sql_datatype, conn_info):
151151
if elem is None:
152152
return None
153153

154-
with post_db.open_connection(conn_info) as conn:
154+
with post_db.open_connection(conn_info, False, True) as conn:
155155
with conn.cursor() as cur:
156156
if sql_datatype == 'bit[]':
157157
cast_datatype = 'boolean[]'
@@ -517,7 +517,7 @@ def locate_replication_slot_by_cur(cursor, dbname, tap_id=None):
517517

518518

519519
def locate_replication_slot(conn_info):
520-
with post_db.open_connection(conn_info, False) as conn:
520+
with post_db.open_connection(conn_info, False, True) as conn:
521521
with conn.cursor() as cur:
522522
return locate_replication_slot_by_cur(cur, conn_info['dbname'], conn_info['tap_id'])
523523

@@ -576,7 +576,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
576576
version = get_pg_version(conn_info)
577577

578578
# Create replication connection and cursor
579-
conn = post_db.open_connection(conn_info, True)
579+
conn = post_db.open_connection(conn_info, True, True)
580580
cur = conn.cursor()
581581

582582
# Set session wal_sender_timeout for PG12 and above

tests/test_discovery.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ def setUp(self):
331331
table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True },
332332
{"name" : 'our_hstore', "type" : "hstore" }],
333333
"name" : TestHStoreTable.table_name}
334-
with get_test_connection() as conn:
334+
with get_test_connection(superuser=True) as conn:
335335
cur = conn.cursor()
336336
cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """)
337337
if cur.fetchone()[0] is None:
@@ -536,7 +536,7 @@ class TestColumnGrants(unittest.TestCase):
536536
table_name = 'CHICKEN TIMES'
537537
user = 'tmp_user_for_grant_tests'
538538
password = 'password'
539-
539+
540540
def setUp(self):
541541
table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True},
542542
{"name" : 'size integer', "type" : "integer", "quoted" : True},
@@ -545,7 +545,7 @@ def setUp(self):
545545
"name" : TestColumnGrants.table_name}
546546
ensure_test_table(table_spec)
547547

548-
with get_test_connection() as conn:
548+
with get_test_connection(superuser=True) as conn:
549549
cur = conn.cursor()
550550

551551
sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password)
@@ -560,8 +560,8 @@ def setUp(self):
560560
LOGGER.info("running sql: {}".format(sql))
561561
cur.execute(sql)
562562

563-
564-
563+
564+
565565

566566
def test_catalog(self):
567567
conn_config = get_test_connection_config()
@@ -587,7 +587,7 @@ def test_catalog(self):
587587
('properties', 'id'): {'inclusion': 'available',
588588
'selected-by-default': True,
589589
'sql-datatype': 'integer'}})
590-
590+
591591
self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS,
592592
'type': 'object',
593593
'properties': {'id': {'type': ['null', 'integer'],

0 commit comments

Comments
 (0)