Skip to content

Commit

Permalink
refactor the fix and test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-jl committed Jan 10, 2024
1 parent 725331a commit dfb6466
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 117 deletions.
88 changes: 49 additions & 39 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,45 +64,12 @@ func (sr *snowflakeRestful) getAsync(
token, _, _ := sr.TokenAccessor.GetTokens()
headers[headerAuthorizationKey] = fmt.Sprintf(headerSnowflakeToken, token)

var err error
var respd execResponse
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}

for {
resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
defer resp.Body.Close()

respd = execResponse{} // reset the response
err = json.NewDecoder(resp.Body).Decode(&respd)
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
if respd.Code != queryInProgressAsyncCode {
// If the query takes longer than 45 seconds to complete the results are not returned.
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retry])
logger.WithContext(ctx).Infof("Query execution still in progress. Sleep for %v ms", sleepTime)
time.Sleep(sleepTime)
}
if retry < len(retryPattern)-1 {
retry++
}

respd, err := getQueryResult(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("error: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}

sc := &snowflakeConn{rest: sr, cfg: cfg, queryContextCache: (&queryContextCache{}).init(), currentTimeProvider: defaultTimeProvider}
Expand Down Expand Up @@ -166,3 +133,46 @@ func (sr *snowflakeRestful) getAsync(
}
return nil
}

func getQueryResult(
ctx context.Context,
sr *snowflakeRestful,
URL *url.URL,
headers map[string]string,
timeout time.Duration) (*execResponse, error) {
var respd *execResponse
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}

for {
resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
return respd, err
}
defer resp.Body.Close()

respd = &execResponse{} // reset the response
err = json.NewDecoder(resp.Body).Decode(&respd)
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
return respd, err
}

Check warning on line 160 in async.go

View check run for this annotation

Codecov / codecov/patch

async.go#L158-L160

Added lines #L158 - L160 were not covered by tests
if respd.Code != queryInProgressAsyncCode {
// If the query takes longer than 45 seconds to complete the results are not returned.
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retry])
logger.WithContext(ctx).Infof("Query execution still in progress. Sleep for %v ms", sleepTime)
time.Sleep(sleepTime)
}
if retry < len(retryPattern)-1 {
retry++
}
}
return respd, nil
}
63 changes: 18 additions & 45 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,62 +197,35 @@ func TestLongRunningAsyncQuery(t *testing.T) {
}
}

func runLongRunningAsyncQuery(ctx context.Context, t *testing.T) {
func TestLongRunningAsyncQueryFetchResultByID2(t *testing.T) {
runDBTest(t, func(dbt *DBTest) {
_ = dbt.mustQueryContext(ctx, "CALL SYSTEM$WAIT(100, 'SECONDS')")
})
}
queryIDChan := make(chan string, 1)
ctx := WithAsyncMode(context.Background())
ctx = WithQueryIDChan(ctx, queryIDChan)

func TestLongRunningAsyncQueryFetchResultByID(t *testing.T) {
queryIDChan := make(chan string, 1)
ctx := WithAsyncMode(context.Background())
ctx = WithQueryIDChan(ctx, queryIDChan)
// Run a long running query asynchronously
go dbt.mustExecContext(ctx, "CALL SYSTEM$WAIT(50, 'SECONDS')")

goRoutineChan := make(chan string)
go func(grCh chan string, qIDch chan string) {
// Get the query ID without waiting for the query to finish
queryID := <-queryIDChan
grCh <- queryID
}(goRoutineChan, queryIDChan)

// Run a long running query asynchronously
go runLongRunningAsyncQuery(ctx, t)

// Get the query ID without waiting for the query to finish
queryID := <-goRoutineChan
if queryID == "" {
t.Fatal("expected a nonempty query ID")
}

conn := openConn(t)
defer conn.Close()
assertNotNilF(t, queryID, "expected a nonempty query ID")

// Fetch the result using the query ID
ctx = WithFetchResultByID(ctx, queryID)
rows, err := conn.QueryContext(ctx, "")
if err != nil {
t.Fatalf("failed to run a query. err: %v", err)
}
defer rows.Close()
ctx = WithFetchResultByID(ctx, queryID)
rows := dbt.mustQueryContext(ctx, queryID)
defer rows.Close()

var v string
i := 0
for {
var v string
i := 0
for rows.Next() {
err := rows.Scan(&v)
if err != nil {
t.Fatalf("failed to get result. err: %v", err)
}
if v == "" {
t.Fatal("should have returned a result")
}
results := []string{"waited 100 seconds", "Statement executed successfully."}
assertNilF(t, err, fmt.Sprintf("failed to get result. err: %v", err))
assertNotNilF(t, v, "should have returned a result")
results := []string{"waited 50 seconds", "Statement executed successfully."}
if v != results[i] {
t.Fatalf("unexpected result returned. expected: %v, but got: %v", results[i], v)
}
i++
}
if !rows.NextResultSet() {
break
}
}
assertFalseF(t, rows.NextResultSet())
})
}
37 changes: 4 additions & 33 deletions monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"net/url"
"strconv"
"time"
)

const urlQueriesResultFmt = "/queries/%s/result"
Expand Down Expand Up @@ -216,39 +215,11 @@ func (sc *snowflakeConn) getQueryResultResp(
}
url := sc.rest.getFullURL(resultPath, &param)

var respd *execResponse
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}

for {
resp, err := sc.rest.FuncGet(ctx, sc.rest, url, headers, sc.rest.RequestTimeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
return nil, err
}
defer resp.Body.Close()
respd = &execResponse{} // reset the response
if err = json.NewDecoder(resp.Body).Decode(&respd); err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
return nil, err
}
if respd.Code != queryInProgressAsyncCode {
// If the query takes longer than 45 seconds to complete the results are not returned.
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retry])
logger.WithContext(ctx).Infof("Query execution still in progress. Sleep for %v ms", sleepTime)
time.Sleep(sleepTime)
}
if retry < len(retryPattern)-1 {
retry++
}
respd, err := getQueryResult(ctx, sc.rest, url, headers, sc.rest.RequestTimeout)
if err != nil {
logger.WithContext(ctx).Errorf("error: %v", err)
return nil, err
}

return respd, nil
}

Expand Down

0 comments on commit dfb6466

Please sign in to comment.