Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 114 additions & 27 deletions pgsql_big_dedupe_example/pgsql_big_dedupe_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import itertools
import io
import csv
import multiprocessing
import math

import dj_database_url
import psycopg2
import psycopg2.extras

import dedupe
from dedupe.backport import Pool
import numpy


Expand All @@ -44,8 +47,16 @@ class Readable(object):

def __init__(self, iterator):

self.output = io.StringIO()
self.writer = csv.writer(self.output)
self.output = io.StringIO(
# necesary for csv. See: https://docs.python.org/3/library/csv.html#id3
newline='',
)
self.writer = csv.writer(
self.output,
# csv.unix_dialect seems the right one
# based on our tests and Postgres CSV defaults:
# https://www.postgresql.org/docs/12/sql-copy.html
dialect=csv.unix_dialect)
self.iterator = iterator

def read(self, size):
Expand Down Expand Up @@ -80,6 +91,30 @@ def cluster_ids(clustered_dupes):
yield donor_id, cluster_id, score


def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_select, partition_offset, partition_end):
read_con = psycopg2.connect(database=db_conf['NAME'],
user=db_conf['USER'],
password=db_conf['PASSWORD'],
host=db_conf['HOST'],
cursor_factory=psycopg2.extras.RealDictCursor)
write_con = psycopg2.connect(database=db_conf['NAME'],
user=db_conf['USER'],
password=db_conf['PASSWORD'],
host=db_conf['HOST'])

with read_con.cursor('donor_partition_select') as read_cur:
read_cur.execute(donor_partition_select, (partition_offset, partition_end))

partition_data = ((row['donor_id'], row) for row in read_cur)
b_data = deduper_fingerprinter(partition_data)

with write_con:
with write_con.cursor() as write_cur:
write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV',
Readable(b_data),
size=10000)


if __name__ == '__main__':
# ## Logging

Expand All @@ -100,8 +135,9 @@ def cluster_ids(clustered_dupes):
logging.getLogger().setLevel(log_level)

# ## Setup
settings_file = 'pgsql_big_dedupe_example_settings'
settings_file = 'pgsql_big_dedupe_example_settings.with-indexes'
training_file = 'pgsql_big_dedupe_example_training.json'
num_cores = multiprocessing.cpu_count()

start_time = time.time()

Expand Down Expand Up @@ -135,14 +171,19 @@ def cluster_ids(clustered_dupes):
# `pgsql_big_dedupe_example_init_db.py`

DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \
"from processed_donors"
"FROM processed_donors"
DONOR_PARTITION_SELECT = "SELECT donor_id, city, name, zip, state, address " \
"FROM processed_donors " \
"WHERE donor_id >= %s " \
"AND donor_id < %s"
COUNT_SELECT = "SELECT COUNT(*) FROM processed_donors"

# ## Training

if os.path.exists(settings_file):
print('reading from ', settings_file)
with open(settings_file, 'rb') as sf:
deduper = dedupe.StaticDedupe(sf, num_cores=4)
deduper = dedupe.StaticDedupe(sf, num_cores=num_cores)
else:

# Define the fields dedupe will pay attention to
Expand All @@ -159,7 +200,7 @@ def cluster_ids(clustered_dupes):
]

# Create a new deduper object and pass our data model to it.
deduper = dedupe.Dedupe(fields, num_cores=4)
deduper = dedupe.Dedupe(fields, num_cores=num_cores)

# Named cursor runs server side with psycopg2
with read_con.cursor('donor_select') as cur:
Expand Down Expand Up @@ -220,34 +261,80 @@ def cluster_ids(clustered_dupes):
cur.execute("CREATE TABLE blocking_map "
"(block_key text, donor_id INTEGER)")

# Compute `(block_key, donor_id)` tuples and write on `blocking_map` in parallel,
# but only for the non-index predicates. Only those can run in parallel
# because they don't need global predicate indexes.
# We cannot share predicate indexes across Python processes because it would consume
# too much RAM
print('computing blocking map in parallel (non-index predicates)')

predicates_without_index = []
predicates_with_index = []
for full_predicate in deduper.fingerprinter.predicates:
if any(predicate in deduper.fingerprinter.index_predicates
for predicate in full_predicate):
predicates_with_index.append(full_predicate)
else:
predicates_without_index.append(full_predicate)

# Use only predicates WITHOUT indexes for parallel blocking
deduper.fingerprinter.predicates = predicates_without_index
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found a problem here... due to these lines:

https://github.com/dedupeio/dedupe/blob/c250911590a72e77612bf9549c78c90e2fb01705/dedupe/blocking.py#L89-L91

The enumerate is causing the predicate to have a different pred_id compared to the serial version. This only happens if there's an index predicate. What would be the best way to solve this? Changing Dedupe internally or doing some workaround here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. that is interesting. enumerate is just identifying the predicate. we could accomplish the same thing by having the predicates be hashable and using the hash.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that looks good to me.


# processed_donors `id` starts at 1 and goes up to `COUNT(*)`
with read_con.cursor('donor_select') as cur:
cur.execute(COUNT_SELECT)
total_rows = cur.fetchone()['count']
partition_size = math.ceil(total_rows / num_cores)
partition_offsets = range(1, total_rows + 1, partition_size)

# Use a multiprocessing.Pool to run parallel_fingerprinter in num_cores.
with Pool(processes=num_cores) as pool:
block_file_path_list = pool.starmap(
parallel_fingerprinter,
[
(
db_conf,
deduper.fingerprinter,
DONOR_PARTITION_SELECT,
partition_offset,
(partition_offset + partition_size)
)
for partition_offset in partition_offsets
],
)

# If dedupe learned a Index Predicate, we have to take a pass
# through the data and create indices.
print('creating inverted index')
# through the data and create indices
if predicates_with_index:
print('creating inverted indexes')

for field in deduper.fingerprinter.index_fields:
with read_con.cursor('field_values') as cur:
cur.execute("SELECT DISTINCT %s FROM processed_donors" % field)
field_data = (row[field] for row in cur)
deduper.fingerprinter.index(field_data, field)
# Use only predicates WITH indexes for non-parallel blocking
deduper.fingerprinter.predicates = predicates_with_index

# Now we are ready to write our blocking map table by creating a
# generator that yields unique `(block_key, donor_id)` tuples.
print('writing blocking map')
for field in deduper.fingerprinter.index_fields:
with read_con.cursor('field_values') as cur:
cur.execute("SELECT DISTINCT %s FROM processed_donors" % field)
field_data = (row[field] for row in cur)
deduper.fingerprinter.index(field_data, field)

with read_con.cursor('donor_select') as read_cur:
read_cur.execute(DONOR_SELECT)
# Compute `(block_key, donor_id)` tuples and write on `blocking_map` in serial
# for the index predicates
print('computing and writing blocking map (index predicates)')

full_data = ((row['donor_id'], row) for row in read_cur)
b_data = deduper.fingerprinter(full_data)
with read_con.cursor('donor_select') as read_cur:
read_cur.execute(DONOR_SELECT)

with write_con:
with write_con.cursor() as write_cur:
write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV',
Readable(b_data),
size=10000)
full_data = ((row['donor_id'], row) for row in read_cur)
b_data = deduper.fingerprinter(full_data)

with write_con:
with write_con.cursor() as write_cur:
write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV',
Readable(b_data),
size=10000)

# free up memory by removing indices
deduper.fingerprinter.reset_indices()
# free up memory by removing indices
deduper.fingerprinter.reset_indices()

logging.info("indexing block_key")
with write_con:
Expand Down