Skip to content

Commit 6957d30

Browse files
authored
adding inserts with duplicate ref_id, func skipping (#3)
* adding inserts with duplicate ref_id, func skipping * remove comment
1 parent 342a7e1 commit 6957d30

2 files changed

Lines changed: 50 additions & 11 deletions

File tree

dao.go

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,45 @@ func (d *docket) initTables() error {
5454
// InsertTasks - supports max 65535/5 = 13,107 tasks in one batch, since postgres supports max 65535 arguments per statement
5555
// if you need more, execute multiple InsertTasks in the same transaction
5656
func (d *docket) InsertTasks(tx *sql.Tx, tcs ...TaskCreator) ([]Task, error) {
57+
return d.insertTasks(tx, false, tcs...)
58+
}
59+
60+
func (d *docket) InsertTask(tx *sql.Tx, tc TaskCreator) (Task, error) {
61+
tasks, err := d.InsertTasks(tx, tc)
62+
if err != nil {
63+
return nil, err
64+
}
65+
if len(tasks) == 1 {
66+
return tasks[0], nil
67+
}
68+
return nil, errors.New("shouldn't happen, got wrong number of tasks from InsertTasks, this is a bug")
69+
}
70+
71+
// InsertTasksSkipDuplicates - supports max 65535/5 = 13,107 tasks in one batch, since postgres supports max 65535 arguments per statement
72+
// if you need more, execute multiple InsertTasks in the same transaction
73+
// requires ref_id to be set on all tasks
74+
func (d *docket) InsertTasksSkipDuplicates(tx *sql.Tx, tcs ...TaskCreator) ([]Task, error) {
75+
for _, tc := range tcs {
76+
if !tc.task.refId.Valid {
77+
return nil, errors.New("InsertTasksSkipDuplicates requires ref_id to be set on all tasks")
78+
}
79+
}
80+
return d.insertTasks(tx, true, tcs...)
81+
}
82+
83+
func (d *docket) InsertTaskSkipDuplicates(tx *sql.Tx, tc TaskCreator) (Task, error) {
84+
tasks, err := d.InsertTasksSkipDuplicates(tx, tc)
85+
if err != nil {
86+
return nil, err
87+
}
88+
if len(tasks) == 1 {
89+
return tasks[0], nil
90+
}
91+
// allowed to skip insert if duplicate, so return nil
92+
return nil, nil
93+
}
94+
95+
func (d *docket) insertTasks(tx *sql.Tx, skipDuplicates bool, tcs ...TaskCreator) ([]Task, error) {
5796
shouldNotify := false
5897
var placeholders []string
5998
var args []interface{}
@@ -75,11 +114,16 @@ func (d *docket) InsertTasks(tx *sql.Tx, tcs ...TaskCreator) ([]Task, error) {
75114
q := `
76115
INSERT INTO pqdocket_task(scheduled_at, ref_id, func, metadata, claim_time_seconds)
77116
VALUES %s
117+
--DUPLICATES_HANDLING--
78118
RETURNING *
79119
`
80120
q = fmt.Sprintf(q, strings.Join(placeholders, ", "))
81121
q = strings.Replace(q, taskTableName, d.tableName(), 1)
82122

123+
if skipDuplicates {
124+
q = strings.Replace(q, "--DUPLICATES_HANDLING--", "ON CONFLICT (ref_id, func) DO NOTHING", 1)
125+
}
126+
83127
var rows *sql.Rows
84128
var err error
85129
if tx == nil {
@@ -110,17 +154,6 @@ func (d *docket) InsertTasks(tx *sql.Tx, tcs ...TaskCreator) ([]Task, error) {
110154
return tasks, rows.Close()
111155
}
112156

113-
func (d *docket) InsertTask(tx *sql.Tx, tc TaskCreator) (Task, error) {
114-
tasks, err := d.InsertTasks(tx, tc)
115-
if err != nil {
116-
return nil, err
117-
}
118-
if len(tasks) == 1 {
119-
return tasks[0], nil
120-
}
121-
return nil, errors.New("shouldn't happen, got wrong number of tasks from InsertTasks, this is a bug")
122-
}
123-
124157
func (d *docket) claimTasks(wantNum int) ([]task, error) {
125158
q := `
126159
WITH tasks_to_claim AS (

docket.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ type Docket interface {
4343
// If you need more, execute multiple InsertTasks in the same transaction.
4444
InsertTasks(tx *sql.Tx, tc ...TaskCreator) ([]Task, error)
4545

46+
// InsertTaskSkipDuplicates - insert a task. Tasks are immediately available to run after being inserted. If a task with the same (func, ref_id) exists, the insert is skipped and no error is returned. Cannot be used without ref id.
47+
InsertTaskSkipDuplicates(tx *sql.Tx, tc TaskCreator) (Task, error)
48+
49+
// InsertTasksSkipDuplicates - batch insert tasks, skipping tasks with duplicate (func, ref_id) Cannot be used without ref id.
50+
InsertTasksSkipDuplicates(tx *sql.Tx, tc ...TaskCreator) ([]Task, error)
51+
4652
// FindTasks is used to query tasks.
4753
// If executed within a transaction (tx != nil), it will lock the returned rows for the duration of the transaction.
4854
FindTasks(tx *sql.Tx, params ...FindParam) ([]Task, error)

0 commit comments

Comments
 (0)