Skip to content

Commit 63be55b

Browse files
committed
migrate upload_sources to Dask Cluster
1 parent 63c71e6 commit 63be55b

File tree

2 files changed

+25
-9
lines changed

2 files changed

+25
-9
lines changed

vast_pipeline/pipeline/finalise.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,10 @@ def final_operations(
7575
axis=1,
7676
meta=object
7777
)
78-
import ipdb; ipdb.set_trace() # breakpoint 0eaeae7c //
7978
# upload sources and related to DB
80-
upload_sources(p_run, srcs_df)
79+
srcs_df = upload_sources(p_run, srcs_df)
8180

82-
# get db ids for sources
83-
srcs_df['id'] = srcs_df['src_dj'].apply(lambda x: x.id, meta=int)
84-
85-
import ipdb; ipdb.set_trace() # breakpoint bcf8f142 //
81+
import ipdb; ipdb.set_trace() # breakpoint 04f4de0a //
8682
# gather the related df, upload to db and save to parquet file
8783
# the df will look like
8884
#

vast_pipeline/pipeline/loading.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import os
22
import logging
33
import pandas as pd
4+
import dask.dataframe as dd
45

56
from django.db import transaction
67

78
from vast_pipeline.image.main import SelavyImage
8-
from vast_pipeline.models import Association, Measurement, Source, RelatedSource
9+
from vast_pipeline.models import (
10+
Association, Measurement, RelatedSource, Run, Source
11+
)
912
from .utils import (
1013
get_create_img, get_create_img_band, get_measurement_models
1114
)
@@ -147,7 +150,7 @@ def upload_images(paths, config, pipeline_run):
147150
return images, meas_dj_obj
148151

149152

150-
def upload_sources(pipeline_run, srcs_df):
153+
def upload_sources(pipeline_run: Run, srcs_df: dd.DataFrame) -> dd.DataFrame:
151154
'''
152155
delete previous sources for given pipeline run and bulk upload
153156
new found sources as well as related sources
@@ -166,7 +169,24 @@ def upload_sources(pipeline_run, srcs_df):
166169
)
167170
logger.debug('(type, #deleted): %s', detail_del)
168171

169-
bulk_upload_model(srcs_df['src_dj'], Source)
172+
dj_src_models = srcs_df['src_dj'].compute()
173+
bulk_upload_model(dj_src_models, Source)
174+
175+
# get db ids for sources and drop the models
176+
srcs_df = srcs_df.drop('src_dj', axis=1)
177+
dj_src_models = dj_src_models.to_frame()
178+
dj_src_models['id'] = dj_src_models['src_dj'].apply(lambda x: x.id)
179+
dj_src_models = dj_src_models.drop('src_dj', axis=1)
180+
181+
srcs_df = dd.merge(
182+
srcs_df,
183+
dj_src_models,
184+
left_index=True,
185+
right_index=True
186+
)
187+
188+
return srcs_df.persist()
189+
170190

171191

172192
def upload_related_sources(related):

0 commit comments

Comments
 (0)