Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-942595: Retry the fetch result request for response code queryInProgressAsyncCode when using WithFetchResultByID #1021

Merged
merged 6 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 54 additions & 39 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,45 +64,12 @@
token, _, _ := sr.TokenAccessor.GetTokens()
headers[headerAuthorizationKey] = fmt.Sprintf(headerSnowflakeToken, token)

var err error
var respd execResponse
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}

for {
resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
defer resp.Body.Close()

respd = execResponse{} // reset the response
err = json.NewDecoder(resp.Body).Decode(&respd)
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
if respd.Code != queryInProgressAsyncCode {
// If the query takes longer than 45 seconds to complete the results are not returned.
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retry])
logger.WithContext(ctx).Infof("Query execution still in progress. Sleep for %v ms", sleepTime)
time.Sleep(sleepTime)
}
if retry < len(retryPattern)-1 {
retry++
}

respd, err := getQueryResultWithRetriesForAsyncMode(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("error: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}

sc := &snowflakeConn{rest: sr, cfg: cfg, queryContextCache: (&queryContextCache{}).init(), currentTimeProvider: defaultTimeProvider}
Expand Down Expand Up @@ -166,3 +133,51 @@
}
return nil
}

func getQueryResultWithRetriesForAsyncMode(
ctx context.Context,
sr *snowflakeRestful,
URL *url.URL,
headers map[string]string,
timeout time.Duration) (*execResponse, error) {
var respd *execResponse
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}
retryPatternIndex := 0

for {
logger.WithContext(ctx).Debugf("Retry count for get query result request in async mode: %v", retry)

resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
return respd, err
}
defer resp.Body.Close()

respd = &execResponse{} // reset the response
err = json.NewDecoder(resp.Body).Decode(&respd)
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
return respd, err
}

Check warning on line 163 in async.go

View check run for this annotation

Codecov / codecov/patch

async.go#L161-L163

Added lines #L161 - L163 were not covered by tests
if respd.Code != queryInProgressAsyncCode {
sfc-gh-ext-simba-jl marked this conversation as resolved.
Show resolved Hide resolved
// If the query takes longer than 45 seconds to complete the results are not returned.
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retryPatternIndex])
logger.WithContext(ctx).Infof("Query execution still in progress. Response code: %v, message: %v Sleep for %v ms", respd.Code, respd.Message, sleepTime)
time.Sleep(sleepTime)
retry++

if retryPatternIndex < len(retryPattern)-1 {
retryPatternIndex++
}
}
}
return respd, nil
}
31 changes: 31 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,34 @@ func TestLongRunningAsyncQuery(t *testing.T) {
}
}
}

func TestLongRunningAsyncQueryFetchResultByID(t *testing.T) {
runDBTest(t, func(dbt *DBTest) {
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
queryIDChan := make(chan string, 1)
ctx := WithAsyncMode(context.Background())
ctx = WithQueryIDChan(ctx, queryIDChan)

// Run a long running query asynchronously
go dbt.mustExecContext(ctx, "CALL SYSTEM$WAIT(50, 'SECONDS')")

// Get the query ID without waiting for the query to finish
queryID := <-queryIDChan
assertNotNilF(t, queryID, "expected a nonempty query ID")

ctx = WithFetchResultByID(ctx, queryID)
rows := dbt.mustQueryContext(ctx, "")
defer rows.Close()

var v string
assertTrueF(t, rows.Next())
err := rows.Scan(&v)
assertNilF(t, err, fmt.Sprintf("failed to get result. err: %v", err))
assertNotNilF(t, v, "should have returned a result")

expected := "waited 50 seconds"
if v != expected {
t.Fatalf("unexpected result returned. expected: %v, but got: %v", expected, v)
}
assertFalseF(t, rows.NextResultSet())
})
}
1 change: 1 addition & 0 deletions cmd/fetchresultbyid/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fetchresultbyid
16 changes: 16 additions & 0 deletions cmd/fetchresultbyid/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include ../../gosnowflake.mak
CMD_TARGET=fetchresultbyid

## Install
install: cinstall

## Run
run: crun

## Lint
lint: clint

## Format source codes
fmt: cfmt

.PHONY: install run lint fmt
137 changes: 137 additions & 0 deletions cmd/fetchresultbyid/fetchresultbyid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"context"
"database/sql"
"database/sql/driver"
"flag"
"log"
"strings"

sf "github.com/snowflakedb/gosnowflake"
)

func main() {
if !flag.Parsed() {
flag.Parse()
}

cfg, err := sf.GetConfigFromEnv([]*sf.ConfigParam{
{Name: "Account", EnvName: "SNOWFLAKE_TEST_ACCOUNT", FailOnMissing: true},
{Name: "User", EnvName: "SNOWFLAKE_TEST_USER", FailOnMissing: true},
{Name: "Password", EnvName: "SNOWFLAKE_TEST_PASSWORD", FailOnMissing: true},
{Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false},
{Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false},
{Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false},
})
if err != nil {
log.Fatalf("failed to create Config, err: %v", err)
}

dsn, err := sf.DSN(cfg)
if err != nil {
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
}

db, err := sql.Open("snowflake", dsn)
if err != nil {
log.Fatalf("failed to connect. %v, err: %v", dsn, err)
}
defer db.Close()

log.Println("Lets simulate running synchronous query and fetching the result by the query ID using the WithFetchResultByID context")
sqlRows := fetchResultByIDSync(db, "SELECT 1")
printSQLRowsResult(sqlRows)

log.Println("Lets simulate running long query asynchronously and fetching result by query ID using a channel provided in the WithQueryIDChan context")
sqlRows = fetchResultByIDAsync(db, "CALL SYSTEM$WAIT(10, 'SECONDS')")
printSQLRowsResult(sqlRows)
}

func fetchResultByIDSync(db *sql.DB, query string) *sql.Rows {
ctx := context.Background()
conn, err := db.Conn(ctx)
if err != nil {
log.Fatalf("failed to get Conn. err: %v", err)
}
defer conn.Close()

var rows1 driver.Rows
var queryID string

// Get the query ID using raw connection
err = conn.Raw(func(x any) error {
log.Printf("Executing query: %v\n", query)
rows1, err = x.(driver.QueryerContext).QueryContext(ctx, query, nil)
if err != nil {
return err
}

queryID = rows1.(sf.SnowflakeRows).GetQueryID()
log.Printf("Query ID retrieved from GetQueryID(): %v\n", queryID)
return nil
})
if err != nil {
log.Fatalf("unable to run the query. err: %v", err)
}

// Update the Context object to specify the query ID
ctx = sf.WithFetchResultByID(ctx, queryID)

// Execute an empty string query
rows2, err := db.QueryContext(ctx, "")
if err != nil {
log.Fatal(err)
}

return rows2
}

func fetchResultByIDAsync(db *sql.DB, query string) *sql.Rows {
// Make a channel to receive the query ID
queryIDChan := make(chan string, 1)

// Enable asynchronous mode
ctx := sf.WithAsyncMode(context.Background())

// Pass the channel to receive the query ID
ctx = sf.WithQueryIDChan(ctx, queryIDChan)

// Run a long running query asynchronously and without retrieving the result
log.Printf("Executing query: %v\n", query)
go db.ExecContext(ctx, query)

// Get the query ID without waiting for the query to finish
queryID := <-queryIDChan
log.Printf("Query ID retrieved from the channel: %v\n", queryID)

// Update the Context object to specify the query ID
ctx = sf.WithFetchResultByID(ctx, queryID)

// Execute an empty string query
rows, err := db.QueryContext(ctx, "")
if err != nil {
log.Fatal(err)
}

return rows
}

func printSQLRowsResult(rows *sql.Rows) {
log.Print("Printing the results: \n")

cols, err := rows.Columns()
if err != nil {
log.Fatalf("failed to get columns. err: %v", err)
}
log.Println(strings.Join(cols, ", "))

var val string
for rows.Next() {
err := rows.Scan(&val)
if err != nil {
log.Fatalf("failed to scan rows. err: %v", err)
}
log.Printf("%v\n", val)
}
}
26 changes: 26 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,32 @@

```

# Fetch Results by Query ID

The result of your query can be retrieved by setting the query ID in the WithFetchResultByID context.
```

// Get the query ID using raw connection as mentioned above:
err := conn.Raw(func(x any) error {
rows1, err = x.(driver.QueryerContext).QueryContext(ctx, "SELECT 1", nil)
queryID = rows1.(sf.SnowflakeRows).GetQueryID()
return nil
}

// Update the Context object to specify the query ID
fetchResultByIDCtx = sf.WithFetchResultByID(ctx, queryID)

// Execute an empty string query
rows2, err := db.QueryContext(fetchResultByIDCtx, "")

// Retrieve the results as usual
for rows2.Next() {
err = rows2.Scan(...)
...
}

```

# Canceling Query by CtrlC

From 0.5.0, a signal handling responsibility has moved to the applications. If you want to cancel a
Expand Down Expand Up @@ -957,7 +983,7 @@

The following example shows how to run a GET command by passing a string to the
db.Query() function:

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.19 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.20 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / AWS Go 1.21 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.19 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.20 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / AZURE Go 1.21 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.19 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.20 on Windows

package comment is detached; there should be no blank lines between it and the package statement

Check failure on line 986 in doc.go

View workflow job for this annotation

GitHub Actions / GCP Go 1.21 on Windows

package comment is detached; there should be no blank lines between it and the package statement
db.Query("GET <internal_stage_identifier> file://<local_file> <optional_parameters>")

"<local_file>" should include the file path as well as the name. Snowflake recommends using
Expand Down
12 changes: 4 additions & 8 deletions monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,10 @@ func (sc *snowflakeConn) getQueryResultResp(
headers[headerAuthorizationKey] = fmt.Sprintf(headerSnowflakeToken, token)
}
url := sc.rest.getFullURL(resultPath, &param)
res, err := sc.rest.FuncGet(ctx, sc.rest, url, headers, sc.rest.RequestTimeout)

respd, err := getQueryResultWithRetriesForAsyncMode(ctx, sc.rest, url, headers, sc.rest.RequestTimeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
return nil, err
}
defer res.Body.Close()
var respd *execResponse
if err = json.NewDecoder(res.Body).Decode(&respd); err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
logger.WithContext(ctx).Errorf("error: %v", err)
return nil, err
}
return respd, nil
Expand All @@ -238,6 +233,7 @@ func (sc *snowflakeConn) rowsForRunningQuery(
logger.WithContext(ctx).Errorf("error: %v", err)
return err
}

if !resp.Success {
code, err := strconv.Atoi(resp.Code)
if err != nil {
Expand Down
Loading