Skip to content

Commit

Permalink
retry fetching query result if still in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-jl committed Jan 8, 2024
1 parent f1d625f commit 2598c50
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 9 deletions.
60 changes: 60 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,63 @@ func TestLongRunningAsyncQuery(t *testing.T) {
}
}
}

func runLongRunningAsyncQuery(t *testing.T, ctx context.Context) {

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.19 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.20 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.19 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.20 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.21 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.19 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.21 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.21 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.20 on Ubuntu

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.20 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.19 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.19 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.19 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.21 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.21 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.20 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.20 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.21 on Mac

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.20 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.19 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.19 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.20 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.20 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.19 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.21 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.21 on Windows

context.Context should be the first parameter of a function

Check failure on line 200 in async_test.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.21 on Windows

context.Context should be the first parameter of a function
runDBTest(t, func(dbt *DBTest) {
_ = dbt.mustQueryContext(ctx, "CALL SYSTEM$WAIT(100, 'SECONDS')")
})
}

func TestLongRunningAsyncQueryFetchResultByID(t *testing.T) {
queryIDChan := make(chan string, 1)
ctx := WithAsyncMode(context.Background())
ctx = WithQueryIDChan(ctx, queryIDChan)

goRoutineChan := make(chan string)
go func(grCh chan string, qIDch chan string) {
queryID := <-queryIDChan
grCh <- queryID
}(goRoutineChan, queryIDChan)

// Run a long running query asynchronously
go runLongRunningAsyncQuery(t, ctx)

// Get the query ID without waiting for the query to finish
queryID := <-goRoutineChan
if queryID == "" {
t.Fatal("expected a nonempty query ID")
}

conn := openConn(t)
defer conn.Close()

// Fetch the result using the query ID
ctx = WithFetchResultByID(ctx, queryID)
rows, err := conn.QueryContext(ctx, "")
if err != nil {
t.Fatalf("failed to run a query. err: %v", err)
}
defer rows.Close()

var v string
i := 0
for {
for rows.Next() {
err := rows.Scan(&v)
if err != nil {
t.Fatalf("failed to get result. err: %v", err)
}
if v == "" {
t.Fatal("should have returned a result")
}
results := []string{"waited 100 seconds", "Statement executed successfully."}
if v != results[i] {
t.Fatalf("unexpected result returned. expected: %v, but got: %v", results[i], v)
}
i++
}
if !rows.NextResultSet() {
break
}
}
}
43 changes: 34 additions & 9 deletions monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net/url"
"strconv"
"time"
)

const urlQueriesResultFmt = "/queries/%s/result"
Expand Down Expand Up @@ -214,17 +215,40 @@ func (sc *snowflakeConn) getQueryResultResp(
headers[headerAuthorizationKey] = fmt.Sprintf(headerSnowflakeToken, token)
}
url := sc.rest.getFullURL(resultPath, &param)
res, err := sc.rest.FuncGet(ctx, sc.rest, url, headers, sc.rest.RequestTimeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
return nil, err
}
defer res.Body.Close()

var respd *execResponse
if err = json.NewDecoder(res.Body).Decode(&respd); err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
return nil, err
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}

for {
resp, err := sc.rest.FuncGet(ctx, sc.rest, url, headers, sc.rest.RequestTimeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
return nil, err
}
defer resp.Body.Close()
respd = &execResponse{} // reset the response
if err = json.NewDecoder(resp.Body).Decode(&respd); err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
return nil, err
}

Check warning on line 234 in monitoring.go

View check run for this annotation

Codecov / codecov/patch

monitoring.go#L232-L234

Added lines #L232 - L234 were not covered by tests
if respd.Code != queryInProgressAsyncCode {
// If the query takes longer than 45 seconds to complete the results are not returned.
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retry])
logger.WithContext(ctx).Infof("Query execution still in progress. Sleep for %v ms", sleepTime)
time.Sleep(sleepTime)
}
if retry < len(retryPattern)-1 {
retry++
}
}

return respd, nil
}

Expand All @@ -238,6 +262,7 @@ func (sc *snowflakeConn) rowsForRunningQuery(
logger.WithContext(ctx).Errorf("error: %v", err)
return err
}

if !resp.Success {
code, err := strconv.Atoi(resp.Code)
if err != nil {
Expand Down

0 comments on commit 2598c50

Please sign in to comment.