diff --git a/tap_postgres/discovery_utils.py b/tap_postgres/discovery_utils.py index 0138333d..476bfed5 100644 --- a/tap_postgres/discovery_utils.py +++ b/tap_postgres/discovery_utils.py @@ -64,7 +64,6 @@ def produce_table_info(conn, filter_schemas=None, tables: Optional[List[str]] = with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='stitch_cursor') as cur: cur.itersize = post_db.CURSOR_ITER_SIZE table_info = {} - # SELECT CASE WHEN $2.typtype = 'd' THEN $2.typbasetype ELSE $1.atttypid END sql = """ SELECT pg_class.reltuples::BIGINT AS approximate_row_count, @@ -73,7 +72,7 @@ def produce_table_info(conn, filter_schemas=None, tables: Optional[List[str]] = pg_class.relname AS table_name, attname AS column_name, i.indisprimary AS primary_key, - format_type(a.atttypid, NULL::integer) AS data_type, + format_type(CASE WHEN pgt.typtype = 'd' THEN pgt.typbasetype ELSE a.atttypid END, NULL::integer) AS data_type, information_schema._pg_char_max_length(CASE WHEN COALESCE(subpgt.typtype, pgt.typtype) = 'd' THEN COALESCE(subpgt.typbasetype, pgt.typbasetype) ELSE COALESCE(subpgt.oid, pgt.oid) END, diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 1dd48aaa..b726d2a2 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -452,6 +452,94 @@ def test_catalog(self): 'definitions' : BASE_RECURSIVE_SCHEMAS}, stream_dict.get('schema')) +class TestDomainsTable(unittest.TestCase): + maxDiff = None + table_name = "CHICKEN TIMES" + + def setUp(self): + table_spec = { + "columns": [ + { + "name": "our_test_domain_pk", + "type": "test_domain", + "primary_key": True, + }, + {"name": "our_test_domain", "type": "test_domain"}, + {"name": "our_test_integer_domain", "type": "test_integer_domain"}, + ], + "name": TestHStoreTable.table_name, + } + with get_test_connection() as conn: + cur = conn.cursor() + cur.execute(""" DROP DOMAIN IF EXISTS test_domain CASCADE """) + cur.execute(""" CREATE DOMAIN test_domain AS text; """) + cur.execute(""" DROP DOMAIN IF EXISTS test_integer_domain CASCADE """) + cur.execute(""" CREATE DOMAIN test_integer_domain AS integer; """) + + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [ + s for s in streams if s["tap_stream_id"] == "public-CHICKEN TIMES" + ] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get("metadata").sort(key=lambda md: md["breadcrumb"]) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + """INSERT INTO "CHICKEN TIMES" (our_test_domain_pk, our_test_domain, our_test_integer_domain) VALUES ('sad', 'happy', 3)""" + ) + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual( + metadata.to_map(stream_dict.get("metadata")), + { + (): { + "table-key-properties": ["our_test_domain_pk"], + "database-name": "postgres", + "schema-name": "public", + "is-view": False, + "row-count": 0, + }, + ("properties", "our_test_domain_pk"): { + "inclusion": "automatic", + "sql-datatype": "text", + "selected-by-default": True, + }, + ("properties", "our_test_domain"): { + "inclusion": "available", + "sql-datatype": "text", + "selected-by-default": True, + }, + ("properties", "our_test_integer_domain"): { + "inclusion": "available", + "sql-datatype": "integer", + "selected-by-default": True, + }, + }, + ) + + self.assertEqual( + { + "properties": { + "our_test_domain_pk": {"type": ["string"]}, + "our_test_domain": {"type": ["null", "string"]}, + "our_test_integer_domain": { + "minimum": -2147483648, + "maximum": 2147483647, + "type": ["null", "integer"], + }, + }, + "type": "object", + "definitions": BASE_RECURSIVE_SCHEMAS, + }, + stream_dict.get("schema"), + ) + class TestArraysTable(unittest.TestCase): maxDiff = None table_name = 'CHICKEN TIMES' @@ -536,7 +624,7 @@ class TestColumnGrants(unittest.TestCase): table_name = 'CHICKEN TIMES' user = 'tmp_user_for_grant_tests' password = 'password' - + def setUp(self): table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, {"name" : 'size integer', "type" : "integer", "quoted" : True}, @@ -560,8 +648,8 @@ def setUp(self): LOGGER.info("running sql: {}".format(sql)) cur.execute(sql) - - + + def test_catalog(self): conn_config = get_test_connection_config() @@ -587,7 +675,7 @@ def test_catalog(self): ('properties', 'id'): {'inclusion': 'available', 'selected-by-default': True, 'sql-datatype': 'integer'}}) - + self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS, 'type': 'object', 'properties': {'id': {'type': ['null', 'integer'],