From 6d531df8d17594bd70524fe0a975c028961b8fa4 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Tue, 8 Mar 2022 22:03:25 +0000 Subject: [PATCH] Add snowflake_put output --- CHANGELOG.md | 3 + go.mod | 4 +- go.sum | 45 ++ .../impl/snowflake/output_snowflake_put.go | 726 ++++++++++++++++++ .../snowflake/output_snowflake_put_test.go | 341 ++++++++ .../snowflake/resources/ssh_keys/README.md | 8 + .../resources/ssh_keys/snowflake_rsa_key.p8 | 30 + .../resources/ssh_keys/snowflake_rsa_key.pem | 28 + public/components/all/package.go | 1 + .../docs/components/outputs/snowflake_put.md | 602 +++++++++++++++ 10 files changed, 1786 insertions(+), 2 deletions(-) create mode 100644 internal/impl/snowflake/output_snowflake_put.go create mode 100644 internal/impl/snowflake/output_snowflake_put_test.go create mode 100644 internal/impl/snowflake/resources/ssh_keys/README.md create mode 100644 internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.p8 create mode 100644 internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.pem create mode 100644 website/docs/components/outputs/snowflake_put.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 064a8845ad..7ad1daa745 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file. ## Unreleased +### Added +- New experimental `snowflake_put` output. + ## 3.65.0 - 2022-03-07 ### Added diff --git a/go.mod b/go.mod index b779c277b1..c054a6a5c9 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/Masterminds/squirrel v1.5.2 github.com/OneOfOne/xxhash v1.2.8 github.com/Shopify/sarama v1.30.1 - github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect github.com/apache/pulsar-client-go v0.7.0 github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220210221528-5daa17b02bff // indirect github.com/apache/thrift v0.15.0 // indirect @@ -98,6 +97,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/sirupsen/logrus v1.8.1 github.com/smira/go-statsd v1.3.2 + github.com/snowflakedb/gosnowflake v1.6.6 github.com/stretchr/testify v1.7.0 github.com/tilinna/z85 v1.0.0 github.com/twmb/franz-go v1.3.1 @@ -110,7 +110,7 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20211228015320-b4f792c43cd0 - github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect + github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a go.mongodb.org/mongo-driver v1.8.2 go.nanomsg.org/mangos/v3 v3.3.0 go.opentelemetry.io/otel v1.4.1 diff --git a/go.sum b/go.sum index 6ff05477bb..237170f105 100644 --- a/go.sum +++ b/go.sum @@ -76,6 +76,7 @@ github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVt github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go v61.1.0+incompatible h1:Qbz3jdfkXIPjZECEuk2E7i3iLhC9Ul74pG5mQRQC+z4= github.com/Azure/azure-sdk-for-go v61.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-storage-blob-go v0.14.0 h1:1BCg74AmVdYwO3dlKwtFU1V0wU2PZdREkXvAmZJRUlM= github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo= github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8= @@ -163,18 +164,50 @@ github.com/aws/aws-sdk-go v1.42.23/go.mod h1:gyRszuZ/icHmHAVE4gc/r+cfCmhA1AD+vqf github.com/aws/aws-sdk-go v1.42.31 h1:tSv/YzjrFlbSqWmov9quBxrSNXLPUjJI7nPEB57S1+M= github.com/aws/aws-sdk-go v1.42.31/go.mod h1:OGr6lGMAKGlG9CVrYnWYDKIyb829c6EVBRjxqjmPepc= github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250= +github.com/aws/aws-sdk-go-v2 v1.11.0 h1:HxyD62DyNhCfiFGUHqJ/xITD6rAjJ7Dm/2nLxLmO4Ag= +github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA= +github.com/aws/aws-sdk-go-v2/config v1.10.1 h1:z/ViqIjW6ZeuLWgTWMTSyZzaVWo/1cWeVf1Uu+RF01E= +github.com/aws/aws-sdk-go-v2/config v1.10.1/go.mod h1:auIv5pIIn3jIBHNRcVQcsczn6Pfa6Dyv80Fai0ueoJU= github.com/aws/aws-sdk-go-v2/credentials v1.3.1/go.mod h1:r0n73xwsIVagq8RsxmZbGSRQFj9As3je72C2WzUIToc= +github.com/aws/aws-sdk-go-v2/credentials v1.6.1 h1:A39JYth2fFCx+omN/gib/jIppx3rRnt2r7UKPq7Mh5Y= +github.com/aws/aws-sdk-go-v2/credentials v1.6.1/go.mod h1:QyvQk1IYTqBWSi1T6UgT/W8DMxBVa5pVuLFSRLLhGf8= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0/go.mod h1:2LAuqPx1I6jNfaGDucWfA2zqQCYCOMCDHiCOciALyNw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 h1:OpZjuUy8Jt3CA1WgJgBC5Bz+uOjE5Ppx4NFTRaooUuA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0/go.mod h1:5E1J3/TTYy6z909QNR0QnXGBpfESYGDqd3O0zqONghU= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2/go.mod h1:qaqQiHSrOUVOfKe6fhgQ6UzhxjwqVW8aHNegd6Ws4w4= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.7.1 h1:p9Dys1g2YdaqMalnp6AwCA+tpMMdJNGw5YYKP/u3sUk= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.7.1/go.mod h1:wN/mvkow08GauDwJ70jnzJ1e+hE+Q3Q7TwpYLXOe9oI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 h1:zY8cNmbBXt3pzjgWgdIbzpQ6qxoCwt+Nx9JbrAf2mbY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0/go.mod h1:NO3Q5ZTTQtO2xIg2+xTXYDiT7knSejfeDm7WGDaOo0U= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 h1:Z3aR/OXBnkYK9zXkNkfitHX6SmUBzSsx8VMHbH4Lvhw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0/go.mod h1:anlUzBoEWglcUxUQwZA7HQOEVEnQALVZsizAapB2hq8= github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1/go.mod h1:Zy8smImhTdOETZqfyn01iNOe0CNggVbPjCajyaz6Gvg= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0 h1:c10Z7fWxtJCoyc8rv06jdh9xrKnu7bAJiRaKWvTb2mU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0/go.mod h1:6oXGy4GLpypD3uCh8wcqztigGgmhLToMfjavgh+VySg= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1/go.mod h1:v33JQ57i2nekYTA70Mb+O18KeH4KqhdqxTJZNK1zdRE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1/go.mod h1:zceowr5Z1Nh2WVP8bf/3ikB41IZW59E4yIYbg+pC6mw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 h1:qGZWS/WgiFY+Zgad2u0gwBHpJxz6Ne401JE7iQI1nKs= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0/go.mod h1:Mq6AEc+oEjCUlBuLiK5YwW4shSOAKCQ3tXN0sQeYoBA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.1/go.mod h1:6EQZIwNNvHpq/2/QSJnp4+ECvqIy55w95Ofs0ze+nGQ= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 h1:0BOlTqnNnrEO04oYKzDxMMe68t107pmIotn18HtVonY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0/go.mod h1:xKCZ4YFSF2s4Hnb/J0TLeOsKuGzICzcElaOKNGrVnx4= github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1/go.mod h1:XLAGFrEjbvMCLvAtWLLP32yTv8GpBquCApZEycDLunI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0 h1:5mRAms4TjSTOGYsqKYte5kHr1PzpMJSyLThjF3J+hw0= +github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0/go.mod h1:Gwz3aVctJe6mUY9T//bcALArPUaFmNAy2rTB9qN4No8= github.com/aws/aws-sdk-go-v2/service/sso v1.3.1/go.mod h1:J3A3RGUvuCZjvSuZEcOpHDnzZP/sKbhDWV2T1EOzFIM= +github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 h1:JDgKIUZOmLFu/Rv6zXLrVTWCmzA0jcTdvsT8iFIKrAI= +github.com/aws/aws-sdk-go-v2/service/sso v1.6.0/go.mod h1:Q/l0ON1annSU+mc0JybDy1Gy6dnJxIcWjphO6qJPzvM= github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BVRASvcU7gYZB9PUgPiByXg= +github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 h1:1jh8J+JjYRp+QWKOsaZt7rGUgoyrqiiVwIm+w0ymeUw= +github.com/aws/aws-sdk-go-v2/service/sts v1.10.0/go.mod h1:jLKCFqS+1T4i7HDqCP9GM4Uk75YW1cS0o82LdxpMyOE= github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= +github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= @@ -306,6 +339,8 @@ github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8= +github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= @@ -314,6 +349,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro= +github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8= github.com/gdamore/optopia v0.2.0/go.mod h1:YKYEwo5C1Pa617H7NlPcmQXl+vG6YnSSNB44n8dNL0Q= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= @@ -704,6 +741,8 @@ github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuR github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8= github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -778,6 +817,8 @@ github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= github.com/smira/go-statsd v1.3.2 h1:1EeuzxNZ/TD9apbTOFSM9nulqfcsQFmT4u1A2DREabI= github.com/smira/go-statsd v1.3.2/go.mod h1:1srXJ9/pbnN04G8f4F1jUzsGOnwkPKXciyqpewGlkC4= +github.com/snowflakedb/gosnowflake v1.6.6 h1:ylcW8T5Yb+kO0eIjDInAA097daw2Li0JV0FQOjbffpg= +github.com/snowflakedb/gosnowflake v1.6.6/go.mod h1:eK8Ei6XQfWlpRkK3+ZT1OFH82Qhz0TNyC+zioKqVrKA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= @@ -1017,11 +1058,13 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210505024714-0287a6fb4125/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -1133,6 +1176,7 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1141,6 +1185,7 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/impl/snowflake/output_snowflake_put.go b/internal/impl/snowflake/output_snowflake_put.go new file mode 100644 index 0000000000..1ba4d7c9b6 --- /dev/null +++ b/internal/impl/snowflake/output_snowflake_put.go @@ -0,0 +1,726 @@ +package snowflake + +import ( + "bytes" + "context" + "crypto/rsa" + "crypto/sha256" + "crypto/x509" + "database/sql" + "encoding/base64" + "encoding/json" + "encoding/pem" + "errors" + "fmt" + "net/http" + "net/url" + "os" + "path" + "strings" + "time" + + ioutput "github.com/benthosdev/benthos/v4/internal/component/output" + "github.com/benthosdev/benthos/v4/internal/old/output" + "github.com/benthosdev/benthos/v4/public/service" + "github.com/gofrs/uuid" + "github.com/golang-jwt/jwt" + "github.com/snowflakedb/gosnowflake" + "github.com/youmark/pkcs8" + "golang.org/x/crypto/ssh" +) + +const ( + defaultJWTTimeout = 60 * time.Second +) + +// CompressionType represents the compression used for the payloads sent to Snowflake +type CompressionType string + +const ( + // CompressionTypeNone No compression + CompressionTypeNone CompressionType = "NONE" + // CompressionTypeAuto Automatic compression (gzip) + CompressionTypeAuto CompressionType = "AUTO" + // CompressionTypeGzip Gzip compression + CompressionTypeGzip CompressionType = "GZIP" + // CompressionTypeDeflate Deflate compression using zlib algorithm (with zlib header, RFC1950) + CompressionTypeDeflate CompressionType = "DEFLATE" + // CompressionTypeRawDeflate Deflate compression using flate algorithm (without header, RFC1951) + CompressionTypeRawDeflate CompressionType = "RAW_DEFLATE" +) + +func snowflakePutOutputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + // Stable(). TODO + Categories(string(output.CategoryServices)). + // Version("4.0.0"). + Summary("Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables."). + Description(ioutput.Description(true, true, ` +In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described +[here](/docs/configuration/interpolation#bloblang-queries). When using batching, messages are grouped by the calculated +stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe +`+"`insertFiles`"+` REST API call will be made for each individual file. + +### Credentials + +Two authentication mechanisms are supported: +- User/password +- Key Pair Authentication + +#### User/password + +This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with +Snowpipe. + +#### Key Pair Authentication + +This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key +beforehand. Please consult the [documentation](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) +for details on how to set it up and assign the Public Key to your user. + +Note that the Snowflake documentation suggests using this command: + +`+"```shell"+` +openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 +`+"```"+` + +to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called +`+"`pbeWithMD5AndDES-CBC`"+`, which part of the PKCS#5 v1.5, which is considered insecure. Due to this, Benthos does not +support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them. One way +of achieving this is to use the following command: + +`+"```shell"+` +openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 +`+"```"+` + +Alternatively, you can re-encrypt an existing key using this command: + +`+"```shell"+` +openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8 +`+"```"+` + +Please consult this [documentation](https://linux.die.net/man/1/pkcs8) for details. + +### Batching + +It's common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your +messages at the output level and join the batch of messages with an +`+"[`archive`](/docs/components/processors/archive)"+` and/or `+"[`compress`](/docs/components/processors/compress)"+` +processor. + +For the optimal batch size, please consult the Snowflake [documentation](https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html). + +### Snowpipe + +Given a table called `+"`BENTHOS_TBL`"+` with one column of type `+"`variant`"+`: + +`+"```sql"+` +CREATE OR REPLACE TABLE BENTHOS_DB.PUBLIC.BENTHOS_TBL(RECORD variant) +`+"```"+` + +and the following `+"`BENTHOS_PIPE`"+` Snowpipe: + +`+"```sql"+` +CREATE OR REPLACE PIPE BENTHOS_DB.PUBLIC.BENTHOS_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTHOS_DB.PUBLIC.BENTHOS_TBL FROM (SELECT * FROM @%BENTHOS_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO) +`+"```"+` + +you can configure Benthos to use the implicit table stage `+"`@%BENTHOS_TBL`"+` as the `+"`stage`"+` and +`+"`BENTHOS_PIPE`"+` as the `+"`snowpipe`"+`. In this case, you must set `+"`compression`"+` to `+"`AUTO`"+` and, if +using message batching, you'll need to configure an [`+"`archive`"+`](/docs/components/processors/archive) processor +with the `+"`concatenate`"+` format. Since the `+"`compression`"+` is set to `+"`AUTO`"+`, the +[gosnowflake](https://github.com/snowflakedb/gosnowflake) client library will compress the messages automatically so you +don't need to add a `+"[`compress`](/docs/components/processors/compress)"+` processor for message batches. + +If you add `+"`STRIP_OUTER_ARRAY = TRUE`"+` in your Snowpipe `+"`FILE_FORMAT`"+` +definition, then you must use `+"`json_array`"+` instead of `+"`concatenate`"+` as the archive processor format. + +Note: Only Snowpipes with `+"`FILE_FORMAT`"+` `+"`TYPE`"+` `+"`JSON`"+` are currently supported. + +### Snowpipe Troubleshooting + +Snowpipe [provides](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html) the `+"`insertReport`"+` +and `+"`loadHistoryScan`"+` REST API endpoints which can be used to get information about recent Snowpipe calls. In +order to query them, you'll first need to generate a valid JWT token for your Snowflake account. There are two methods +for doing so: +- Using the `+"`snowsql`"+` [utility](https://docs.snowflake.com/en/user-guide/snowsql.html): + +`+"```shell"+` +snowsql --private-key-path rsa_key.p8 --generate-jwt -a -u +`+"```"+` + +- Using the Python `+"`jwt-generator`"+` [utility](https://docs.snowflake.com/en/developer-guide/sql-api/guide.html#using-key-pair-authentication): + +`+"```shell"+` +python3 jwt-generator.py --private_key_file_path=rsa_key.p8 --account= --user= +`+"```"+` + +Once you successfully generate a JWT token and store it into the `+"`JWT_TOKEN`"+` environment variable, then you can, +for example, query the `+"`insertReport`"+` endpoint using `+"`curl`"+`: + +`+"```shell"+` +curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://.snowflakecomputing.com/v1/data/pipes/../insertReport" +`+"```"+` + +If you need to pass in a valid `+"`requestId`"+` to any of these Snowpipe REST API endpoints, you can enable debug +logging as described [here](/docs/components/logger/about) and Benthos will print the RequestIDs that it sends to +Snowpipe. They match the name of the file that is placed in the stage. +`)). + Field(service.NewStringField("account").Description(`Account name, which is the same as the Account Identifier +as described [here](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#where-are-account-identifiers-used). +However, when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier), +the Account Identifier is formatted as `+"`..`"+` and this field needs to be +populated using the `+"``"+` part. +`)). + Field(service.NewStringField("region").Description(`Optional region field which needs to be populated when using +an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier) +and it must be set to the `+"``"+` part of the Account Identifier +(`+"`..`"+`). +`).Example("us-west-2").Optional()). + Field(service.NewStringField("cloud").Description(`Optional cloud platform field which needs to be populated +when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier) +and it must be set to the `+"``"+` part of the Account Identifier +(`+"`..`"+`). +`).Example("aws").Example("gcp").Example("azure").Optional()). + Field(service.NewStringField("user").Description("Username.")). + Field(service.NewStringField("password").Description("An optional password.").Optional()). + Field(service.NewStringField("private_key_file").Description("The path to a file containing the private SSH key.").Optional()). + Field(service.NewStringField("private_key_pass").Description("An optional private SSH key passphrase.").Optional()). + Field(service.NewStringField("role").Description("Role.")). + Field(service.NewStringField("database").Description("Database.")). + Field(service.NewStringField("warehouse").Description("Warehouse.")). + Field(service.NewStringField("schema").Description("Schema.")). + Field(service.NewInterpolatedStringField("stage").Description(`Stage name. Use either one of the +[supported](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html) stage types.`)). + Field(service.NewStringField("path").Description("Stage path.")). + Field(service.NewIntField("upload_parallel_threads").Description("Specifies the number of threads to use for uploading files.").Advanced().Default(4)). + Field(service.NewStringAnnotatedEnumField("compression", map[string]string{ + string(CompressionTypeNone): "No compression is applied and messages must contain plain-text JSON.", + string(CompressionTypeAuto): "Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON.", + string(CompressionTypeGzip): "Messages must be pre-compressed using the gzip algorithm.", + string(CompressionTypeDeflate): "Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950).", + string(CompressionTypeRawDeflate): "Messages must be pre-compressed using the flate algorithm (without header, RFC1951).", + }).Description("Compression type.").Default(string(CompressionTypeAuto))). + Field(service.NewInterpolatedStringField("snowpipe").Description(`An optional Snowpipe name. Use the `+"``"+` part from `+"`..`"+`.`).Optional()). + Field(service.NewBatchPolicyField("batching")). + Field(service.NewIntField("max_in_flight").Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput.").Default(1)). + Example("No compression", "Upload uncompressed messages concatenated into a .json file to a table stage without calling Snowpipe.", ` +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: NONE + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate +`). + Example("Automatic compression", "Upload messages concatenated into a JSON file to a stage and automatically compressed into a .gz file to a table stage without calling Snowpipe.", ` +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: AUTO + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate +`). + Example("DEFLATE compression", "Upload messages as a compressed .deflate archive of documents to a table stage and call Snowpipe to load it into a table.", ` +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: DEFLATE + snowpipe: BENTHOS_PIPE + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate + - compress: + algorithm: zlib +`). + Example("RAW_DEFLATE compression", "Upload messages as a compressed .rawdeflate archive of documents to a table stage and call Snowpipe to load it into a table.", ` +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: RAW_DEFLATE + snowpipe: BENTHOS_PIPE + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate + - compress: + algorithm: flate +`) +} + +func init() { + err := service.RegisterBatchOutput("snowflake_put", snowflakePutOutputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) ( + output service.BatchOutput, + batchPolicy service.BatchPolicy, + maxInFlight int, + err error, + ) { + if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil { + return + } + if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + output, err = newSnowflakeWriterFromConfig(conf, mgr.Logger(), mgr.Metrics()) + return + }) + + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +// getPrivateKey reads and parses the private key +// Inspired from https://github.com/chanzuckerberg/terraform-provider-snowflake/blob/c07d5820bea7ac3d8a5037b0486c405fdf58420e/pkg/provider/provider.go#L367 +func getPrivateKey(path, passphrase string) (*rsa.PrivateKey, error) { + privateKeyBytes, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read private key %s: %s", path, err) + } + if len(privateKeyBytes) == 0 { + return nil, errors.New("private key is empty") + } + + privateKeyBlock, _ := pem.Decode(privateKeyBytes) + if privateKeyBlock == nil { + return nil, fmt.Errorf("could not parse private key, key is not in PEM format") + } + + if privateKeyBlock.Type == "ENCRYPTED PRIVATE KEY" { + if passphrase == "" { + return nil, fmt.Errorf("private key requires a passphrase, but private_key_passphrase was not supplied") + } + + // Only keys encrypted with pbes2 http://oid-info.com/get/1.2.840.113549.1.5.13 are supported. + // pbeWithMD5AndDES-CBC http://oid-info.com/get/1.2.840.113549.1.5.3 is not supported. + privateKey, err := pkcs8.ParsePKCS8PrivateKeyRSA(privateKeyBlock.Bytes, []byte(passphrase)) + if err != nil { + return nil, fmt.Errorf("failed to decrypt encrypted private key (only ciphers aes-128-cbc, aes-128-gcm, aes-192-cbc, aes-192-gcm, aes-256-cbc, aes-256-gcm, and des-ede3-cbc are supported): %s", err) + } + + return privateKey, nil + } + + privateKey, err := ssh.ParseRawPrivateKey(privateKeyBytes) + if err != nil { + return nil, fmt.Errorf("could not parse private key: %s", err) + } + + rsaPrivateKey, ok := privateKey.(*rsa.PrivateKey) + if !ok { + return nil, fmt.Errorf("private key must be of type RSA but got %T instead: ", privateKey) + } + return rsaPrivateKey, nil +} + +// calculatePublicKeyFingerprint computes the value of the `RSA_PUBLIC_KEY_FP` for the current user based on the +// configured private key +// Inspired from https://stackoverflow.com/questions/63598044/snowpipe-rest-api-returning-always-invalid-jwt-token +func calculatePublicKeyFingerprint(privateKey *rsa.PrivateKey) (string, error) { + pubKey := privateKey.Public() + pubDER, err := x509.MarshalPKIXPublicKey(pubKey) + if err != nil { + return "", fmt.Errorf("failed to marshal public key: %s", err) + } + + hash := sha256.Sum256(pubDER) + return "SHA256:" + base64.StdEncoding.EncodeToString(hash[:]), nil +} + +type dbI interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + Close() error +} + +type uuidGenI interface { + NewV4() (uuid.UUID, error) +} + +type httpClientI interface { + Do(req *http.Request) (*http.Response, error) +} + +type snowflakeWriter struct { + logger *service.Logger + + account string + user string + database string + schema string + stage *service.InterpolatedString + path string + snowpipe *service.InterpolatedString + + accountIdentifier string + putQueryFormat string + outputFileExtension string + privateKey *rsa.PrivateKey + publicKeyFingerprint string + dsn string + + uuidGenerator uuidGenI + httpClient httpClientI + nowFn func() time.Time + db dbI +} + +func newSnowflakeWriterFromConfig(conf *service.ParsedConfig, logger *service.Logger, metrics *service.Metrics) (*snowflakeWriter, error) { + s := snowflakeWriter{ + logger: logger, + uuidGenerator: uuid.NewGen(), + httpClient: http.DefaultClient, + nowFn: time.Now, + } + + var err error + + if s.account, err = conf.FieldString("account"); err != nil { + return nil, fmt.Errorf("failed to parse account: %s", err) + } + + s.accountIdentifier = s.account + + if conf.Contains("region") { + var region string + if region, err = conf.FieldString("region"); err != nil { + return nil, fmt.Errorf("failed to parse region: %s", err) + } + s.accountIdentifier += "." + region + } + + if conf.Contains("cloud") { + var cloud string + if cloud, err = conf.FieldString("cloud"); err != nil { + return nil, fmt.Errorf("failed to parse cloud: %s", err) + } + s.accountIdentifier += "." + cloud + } + + if s.user, err = conf.FieldString("user"); err != nil { + return nil, fmt.Errorf("failed to parse user: %s", err) + } + + var password string + if conf.Contains("password") { + if password, err = conf.FieldString("password"); err != nil { + return nil, fmt.Errorf("failed to parse password: %s", err) + } + } + + var role string + if role, err = conf.FieldString("role"); err != nil { + return nil, fmt.Errorf("failed to parse role: %s", err) + } + + if s.database, err = conf.FieldString("database"); err != nil { + return nil, fmt.Errorf("failed to parse database: %s", err) + } + + var warehouse string + if warehouse, err = conf.FieldString("warehouse"); err != nil { + return nil, fmt.Errorf("failed to parse warehouse: %s", err) + } + + if s.schema, err = conf.FieldString("schema"); err != nil { + return nil, fmt.Errorf("failed to parse schema: %s", err) + } + + if s.stage, err = conf.FieldInterpolatedString("stage"); err != nil { + return nil, fmt.Errorf("failed to parse stage: %s", err) + } + + if s.path, err = conf.FieldString("path"); err != nil { + return nil, fmt.Errorf("failed to parse path: %s", err) + } + + var uploadParallelThreads int + if uploadParallelThreads, err = conf.FieldInt("upload_parallel_threads"); err != nil { + return nil, fmt.Errorf("failed to parse stage: %s", err) + } + + compressionStr, err := conf.FieldString("compression") + if err != nil { + return nil, fmt.Errorf("failed to parse compression: %s", err) + } + + compression := CompressionType(compressionStr) + var autoCompress, sourceCompression string + switch compression { + case CompressionTypeNone: + s.outputFileExtension = ".json" + autoCompress = "FALSE" + sourceCompression = "NONE" + case CompressionTypeAuto: + s.outputFileExtension = ".gz" + autoCompress = "TRUE" + sourceCompression = "AUTO_DETECT" + case CompressionTypeGzip: + s.outputFileExtension = ".gz" + autoCompress = "FALSE" + sourceCompression = "GZIP" + case CompressionTypeDeflate: + s.outputFileExtension = ".deflate" + autoCompress = "FALSE" + sourceCompression = string(compression) + case CompressionTypeRawDeflate: + s.outputFileExtension = ".rawdeflate" + autoCompress = "FALSE" + sourceCompression = string(compression) + default: + return nil, fmt.Errorf("unrecognised compression type: %s", compression) + } + + // File path and stage are populated dynamically via interpolation + s.putQueryFormat = fmt.Sprintf("PUT file://%%s %%s AUTO_COMPRESS = %s SOURCE_COMPRESSION = %s PARALLEL=%d", autoCompress, sourceCompression, uploadParallelThreads) + + if conf.Contains("snowpipe") { + if s.snowpipe, err = conf.FieldInterpolatedString("snowpipe"); err != nil { + return nil, fmt.Errorf("failed to parse snowpipe: %s", err) + } + } + + authenticator := gosnowflake.AuthTypeJwt + if password == "" { + var privateKeyFile string + if privateKeyFile, err = conf.FieldString("private_key_file"); err != nil { + return nil, fmt.Errorf("failed to parse private_key_file: %s", err) + } + + var privateKeyPass string + if conf.Contains("private_key_pass") { + if privateKeyPass, err = conf.FieldString("private_key_pass"); err != nil { + return nil, fmt.Errorf("failed to parse private_key_pass: %s", err) + } + } + + if s.privateKey, err = getPrivateKey(privateKeyFile, privateKeyPass); err != nil { + return nil, fmt.Errorf("failed to read private key: %s", err) + } + + if s.publicKeyFingerprint, err = calculatePublicKeyFingerprint(s.privateKey); err != nil { + return nil, fmt.Errorf("failed to calculate public key fingerprint: %s", err) + } + } else { + authenticator = gosnowflake.AuthTypeSnowflake + } + + if s.dsn, err = gosnowflake.DSN(&gosnowflake.Config{ + Account: s.accountIdentifier, + // Region: The driver extracts the region automatically from the account and I think it doesn't have to be set here + Password: password, + Authenticator: authenticator, + User: s.user, + Role: role, + Database: s.database, + Warehouse: warehouse, + Schema: s.schema, + PrivateKey: s.privateKey, + }); err != nil { + return nil, fmt.Errorf("failed to construct DSN: %s", err) + } + + return &s, nil +} + +//------------------------------------------------------------------------------ + +func (s *snowflakeWriter) Connect(ctx context.Context) error { + var err error + s.db, err = sql.Open("snowflake", s.dsn) + if err != nil { + return fmt.Errorf("failed to connect to snowflake: %s", err) + } + + return nil +} + +// createJWT creates a new Snowpipe JWT token +// Inspired from https://stackoverflow.com/questions/63598044/snowpipe-rest-api-returning-always-invalid-jwt-token +func (s *snowflakeWriter) createJWT() (string, error) { + // Need to use the account without the region segment as described in https://stackoverflow.com/questions/65811588/snowflake-jdbc-driver-throws-net-snowflake-client-jdbc-snowflakesqlexception-jw + qualifiedUsername := strings.ToUpper(s.account + "." + s.user) + now := s.nowFn().UTC() + token := jwt.NewWithClaims(jwt.SigningMethodRS256, jwt.MapClaims{ + "iss": qualifiedUsername + "." + s.publicKeyFingerprint, + "sub": qualifiedUsername, + "iat": now.Unix(), + "exp": now.Add(defaultJWTTimeout).Unix(), + }) + + return token.SignedString(s.privateKey) +} + +func (s *snowflakeWriter) getSnowpipeInsertURL(snowpipe, requestID string) string { + query := url.Values{"requestId": []string{requestID}} + u := url.URL{ + Scheme: "https", + Host: fmt.Sprintf("%s.snowflakecomputing.com", s.accountIdentifier), + Path: path.Join("/v1/data/pipes", fmt.Sprintf("%s.%s.%s", s.database, s.schema, snowpipe), "insertFiles"), + RawQuery: query.Encode(), + } + return u.String() +} + +func (s *snowflakeWriter) callSnowpipe(ctx context.Context, snowpipe, requestID, filename string) error { + jwtToken, err := s.createJWT() + if err != nil { + return fmt.Errorf("failed to create Snowpipe JWT token: %s", err) + } + + type File struct { + Path string `json:"path"` + } + reqPayload := struct { + Files []File `json:"files"` + }{ + Files: []File{ + { + Path: filename, + }, + }, + } + + buf := bytes.Buffer{} + if err := json.NewEncoder(&buf).Encode(reqPayload); err != nil { + return fmt.Errorf("failed to marshal request body JSON: %s", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.getSnowpipeInsertURL(snowpipe, requestID), &buf) + if err != nil { + return fmt.Errorf("failed to create Snowpipe HTTP request: %s", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+jwtToken) + + resp, err := s.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to execute Snowpipe HTTP request: %s", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("received unexpected Snowpipe response status: %d", resp.StatusCode) + } + + var respPayload struct { + ResponseCode string + } + if err = json.NewDecoder(resp.Body).Decode(&respPayload); err != nil { + return fmt.Errorf("failed to decode Snowpipe HTTP response: %s", err) + } + if respPayload.ResponseCode != "SUCCESS" { + return fmt.Errorf("received unexpected Snowpipe response code: %s", respPayload.ResponseCode) + } + + return nil +} + +func (s *snowflakeWriter) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + type File struct { + Stage string + Snowpipe string + } + + // Create one file for each stage-snowpipe combination + files := map[File][]byte{} + for _, msg := range batch { + b, err := msg.AsBytes() + if err != nil { + return fmt.Errorf("failed to get message bytes: %s", err) + } + + snowpipe := "" + if s.snowpipe != nil { + snowpipe = s.snowpipe.String(msg) + } + ss := File{ + Stage: s.stage.String(msg), + Snowpipe: snowpipe, + } + + files[ss] = append(files[ss], b...) + } + + for file, batch := range files { + uuid, err := s.uuidGenerator.NewV4() + if err != nil { + return fmt.Errorf("failed to generate requestID: %s", err) + } + + requestID := uuid.String() + filepath := path.Join(s.path, requestID+s.outputFileExtension) + + _, err = s.db.ExecContext(gosnowflake.WithFileStream(ctx, bytes.NewReader(batch)), fmt.Sprintf(s.putQueryFormat, filepath, path.Join(file.Stage, s.path))) + if err != nil { + return fmt.Errorf("failed to run query: %s", err) + } + + if file.Snowpipe != "" { + s.logger.Debugf("Calling Snowpipe with requestId=%s", requestID) + + if err := s.callSnowpipe(ctx, file.Snowpipe, requestID, filepath); err != nil { + return fmt.Errorf("failed to call Snowpipe: %s", err) + } + } + } + + return nil +} + +func (s *snowflakeWriter) Close(ctx context.Context) error { + return s.db.Close() +} diff --git a/internal/impl/snowflake/output_snowflake_put_test.go b/internal/impl/snowflake/output_snowflake_put_test.go new file mode 100644 index 0000000000..bf9be2c021 --- /dev/null +++ b/internal/impl/snowflake/output_snowflake_put_test.go @@ -0,0 +1,341 @@ +package snowflake + +import ( + "bytes" + "context" + "database/sql" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/benthosdev/benthos/v4/public/service" + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + dummyUUID = "12345678-90ab-cdef-1234-567890abcdef" +) + +type MockDB struct { + Queries []string + QueriesCount int +} + +func (db *MockDB) ExecContext(ctx context.Context, query string, _ ...interface{}) (sql.Result, error) { + db.Queries = append(db.Queries, query) + db.QueriesCount++ + + return nil, nil +} + +func (db *MockDB) Close() error { return nil } + +func (db *MockDB) hasQuery(query string) bool { + for _, q := range db.Queries { + if q == query { + return true + } + } + + return false +} + +type MockUUIDGenerator struct{} + +func (MockUUIDGenerator) NewV4() (uuid.UUID, error) { + return uuid.Must(uuid.FromString(dummyUUID)), nil +} + +type MockHTTPClient struct { + SnowpipeHost string + Queries []string + QueriesCount int + Payloads []string + JWTs []string +} + +func (c *MockHTTPClient) Do(req *http.Request) (*http.Response, error) { + req.URL.Host = c.SnowpipeHost + req.URL.Scheme = "http" + + query := req.URL.Path + query += "?" + req.URL.RawQuery + c.Queries = append(c.Queries, query) + c.QueriesCount++ + + // Read request body and recreate it + bodyBytes, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + req.Body.Close() + req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + c.Payloads = append(c.Payloads, strings.TrimSpace(string(bodyBytes))) + + c.JWTs = append(c.JWTs, req.Header.Get("Authorization")) + + return http.DefaultClient.Do(req) +} + +func (c *MockHTTPClient) hasQuery(query string) bool { + for _, q := range c.Queries { + if q == query { + return true + } + } + + return false +} + +func (c *MockHTTPClient) hasPayload(payload string) bool { + for _, p := range c.Payloads { + if p == payload { + return true + } + } + + return false +} + +func getSnowflakeWriter(t *testing.T, privateKeyPath, privateKeyPassphrase, stage, snowpipe, compression string) (*snowflakeWriter, error) { + t.Helper() + + outputConfig := ` +account: benthos +region: east-us-2 +cloud: azure +user: foobar +private_key_file: ` + privateKeyPath + ` +private_key_pass: ` + privateKeyPassphrase + ` +role: test_role +database: test_db +warehouse: test_warehouse +schema: test_schema +path: foo/bar/baz +stage: '` + stage + `' +upload_parallel_threads: 42 +compression: ` + compression + ` +snowpipe: '` + snowpipe + `' +` + + spec := snowflakePutOutputConfig() + env := service.NewEnvironment() + conf, err := spec.ParseYAML(outputConfig, env) + require.NoError(t, err) + + return newSnowflakeWriterFromConfig(conf, nil, nil) +} + +func TestSnowflakeOutput(t *testing.T) { + tests := []struct { + name string + privateKeyPath string + privateKeyPassphrase string + stage string + snowpipe string + compression string + snowflakeHTTPResponseCode int + snowflakeResponseCode string + wantPUTQuery string + wantPUTQueriesCount int + wantSnowpipeQuery string + wantSnowpipeQueriesCount int + wantSnowpipePayload string + wantSnowpipeJWT string + errConfigContains string + errContains string + }{ + { + name: "executes snowflake query with plaintext SSH key", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + compression: "NONE", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".json @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = NONE PARALLEL=42", + }, + { + name: "executes snowflake query with encrypted SSH key", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.p8", + privateKeyPassphrase: "test123", + stage: "@test_stage", + compression: "NONE", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".json @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = NONE PARALLEL=42", + }, + { + name: "fails to read missing SSH key", + privateKeyPath: "resources/ssh_keys/missing_key.pem", + stage: "@test_stage", + compression: "NONE", + errConfigContains: "failed to read private key resources/ssh_keys/missing_key.pem: open resources/ssh_keys/missing_key.pem: no such file or directory", + }, + { + name: "fails to read encrypted SSH key without passphrase", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.p8", + stage: "@test_stage", + compression: "NONE", + errConfigContains: "failed to read private key: private key requires a passphrase, but private_key_passphrase was not supplied", + }, + { + name: "executes snowflake query without compression", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + compression: "NONE", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".json @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = NONE PARALLEL=42", + }, + { + name: "executes snowflake query with automatic compression", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + compression: "AUTO", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".gz @test_stage/foo/bar/baz AUTO_COMPRESS = TRUE SOURCE_COMPRESSION = AUTO_DETECT PARALLEL=42", + }, + { + name: "executes snowflake query with gzip compression", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + compression: "GZIP", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".gz @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = GZIP PARALLEL=42", + }, + { + name: "executes snowflake query with DEFLATE compression", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + compression: "DEFLATE", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".deflate @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = DEFLATE PARALLEL=42", + }, + { + name: "executes snowflake query with RAW_DEFLATE compression", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + compression: "RAW_DEFLATE", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".rawdeflate @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = RAW_DEFLATE PARALLEL=42", + }, + { + name: "executes snowflake query and calls Snowpipe", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + snowpipe: "test_pipe", + compression: "NONE", + snowflakeHTTPResponseCode: http.StatusOK, + snowflakeResponseCode: "SUCCESS", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".json @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = NONE PARALLEL=42", + wantPUTQueriesCount: 1, + wantSnowpipeQuery: "/v1/data/pipes/test_db.test_schema.test_pipe/insertFiles?requestId=" + dummyUUID, + wantSnowpipeQueriesCount: 1, + wantSnowpipePayload: `{"files":[{"path":"foo/bar/baz/` + dummyUUID + `.json"}]}`, + wantSnowpipeJWT: "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOi02MjEzNTU5Njc0MCwiaWF0IjotNjIxMzU1OTY4MDAsImlzcyI6IkJFTlRIT1MuRk9PQkFSLlNIQTI1Njprc3dSSG9uZmU0QllXQWtReUlBUDVzY2w5OUxRQ0U2S1Irc0J4VEVoenBFPSIsInN1YiI6IkJFTlRIT1MuRk9PQkFSIn0.ABldbfDem53G-EDMoQaY7VVA2RXPryvXFcY0Hqogu_-qjT3qcJEY1aM1B9SqATkeFDNiagOXPl218dUc-Hes4WTbWnoXq8EUlMLjbg3_9qrlp6p-6SzUbX88lpkuYPXD3UiDBhLXsQso5ciufev2IFX5oCt-Oxg9GbI4uIveey_k8dv3S2a942RQbB6ffCj3Stca31oz2F_IPaF2xDmwVsBig_C9NoHToQFVAfVbPIV1hMDIc7zutuLqXQWZPfT6K0PPc15ZMutQQ0tEYCboDanx3tXe9ub_gLfyGaHwuDUXBk3EN3UkZ8rmgasCk_VnFZ_Xk6tnaZfdIrGKRZ5dsA", + }, + { + name: "gets error code from Snowpipe", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + snowpipe: "test_pipe", + compression: "NONE", + snowflakeHTTPResponseCode: http.StatusOK, + snowflakeResponseCode: "FAILURE", + errContains: "received unexpected Snowpipe response code: FAILURE", + }, + { + name: "gets http error from Snowpipe", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + snowpipe: "test_pipe", + compression: "NONE", + snowflakeHTTPResponseCode: http.StatusTeapot, + errContains: "received unexpected Snowpipe response status: 418", + }, + { + name: "handles stage interpolation and runs a query for each sub-batch", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: `@test_stage_${! json("id") }`, + compression: "NONE", + wantPUTQueriesCount: 2, + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".json @test_stage_bar/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = NONE PARALLEL=42", + }, + { + name: "handles Snowpipe interpolation and runs a query for each sub-batch", + privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.pem", + stage: "@test_stage", + snowpipe: `test_pipe_${! json("id") }`, + compression: "NONE", + snowflakeHTTPResponseCode: http.StatusOK, + snowflakeResponseCode: "SUCCESS", + wantPUTQuery: "PUT file://foo/bar/baz/" + dummyUUID + ".json @test_stage/foo/bar/baz AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = NONE PARALLEL=42", + wantPUTQueriesCount: 2, + wantSnowpipeQuery: "/v1/data/pipes/test_db.test_schema.test_pipe_bar/insertFiles?requestId=" + dummyUUID, + wantSnowpipeQueriesCount: 2, + wantSnowpipePayload: `{"files":[{"path":"foo/bar/baz/` + dummyUUID + `.json"}]}`, + wantSnowpipeJWT: "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOi02MjEzNTU5Njc0MCwiaWF0IjotNjIxMzU1OTY4MDAsImlzcyI6IkJFTlRIT1MuRk9PQkFSLlNIQTI1Njprc3dSSG9uZmU0QllXQWtReUlBUDVzY2w5OUxRQ0U2S1Irc0J4VEVoenBFPSIsInN1YiI6IkJFTlRIT1MuRk9PQkFSIn0.ABldbfDem53G-EDMoQaY7VVA2RXPryvXFcY0Hqogu_-qjT3qcJEY1aM1B9SqATkeFDNiagOXPl218dUc-Hes4WTbWnoXq8EUlMLjbg3_9qrlp6p-6SzUbX88lpkuYPXD3UiDBhLXsQso5ciufev2IFX5oCt-Oxg9GbI4uIveey_k8dv3S2a942RQbB6ffCj3Stca31oz2F_IPaF2xDmwVsBig_C9NoHToQFVAfVbPIV1hMDIc7zutuLqXQWZPfT6K0PPc15ZMutQQ0tEYCboDanx3tXe9ub_gLfyGaHwuDUXBk3EN3UkZ8rmgasCk_VnFZ_Xk6tnaZfdIrGKRZ5dsA", + }, + // TODO: + // - Snowflake PUT query payload tests + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s, err := getSnowflakeWriter(t, test.privateKeyPath, test.privateKeyPassphrase, test.stage, test.snowpipe, test.compression) + if test.errConfigContains == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), test.errConfigContains) + return + } + + s.uuidGenerator = MockUUIDGenerator{} + + snowpipeTestServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(test.snowflakeHTTPResponseCode) + w.Write([]byte(`{"ResponseCode": "` + test.snowflakeResponseCode + `"}`)) + })) + t.Cleanup(snowpipeTestServer.Close) + + mockHTTPClient := MockHTTPClient{ + SnowpipeHost: snowpipeTestServer.Listener.Addr().String(), + } + s.httpClient = &mockHTTPClient + + mockDB := MockDB{} + s.db = &mockDB + + s.nowFn = func() time.Time { return time.Time{} } + + err = s.WriteBatch(context.Background(), service.MessageBatch{ + service.NewMessage([]byte(`{"id":"foo","content":"foo stuff"}`)), + service.NewMessage([]byte(`{"id":"bar","content":"bar stuff"}`)), + }) + if test.errContains == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), test.errContains) + return + } + + if test.wantPUTQueriesCount > 0 { + assert.Equal(t, test.wantPUTQueriesCount, mockDB.QueriesCount) + } + if test.wantPUTQuery != "" { + assert.True(t, mockDB.hasQuery(test.wantPUTQuery)) + } + if test.wantSnowpipeQueriesCount > 0 { + assert.Equal(t, test.wantSnowpipeQueriesCount, mockHTTPClient.QueriesCount) + assert.Len(t, mockHTTPClient.JWTs, test.wantSnowpipeQueriesCount) + for _, jwt := range mockHTTPClient.JWTs { + assert.Equal(t, test.wantSnowpipeJWT, jwt) + } + } + if test.wantSnowpipeQuery != "" { + assert.True(t, mockHTTPClient.hasQuery(test.wantSnowpipeQuery)) + } + if test.wantSnowpipePayload != "" { + assert.True(t, mockHTTPClient.hasPayload(test.wantSnowpipePayload)) + } + }) + } +} diff --git a/internal/impl/snowflake/resources/ssh_keys/README.md b/internal/impl/snowflake/resources/ssh_keys/README.md new file mode 100644 index 0000000000..e1e7fd41e8 --- /dev/null +++ b/internal/impl/snowflake/resources/ssh_keys/README.md @@ -0,0 +1,8 @@ +# Commands used to generate private SSH keys for Snowpipe tests + +```shell +> openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -passout pass:test123 -out internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.p8 +> openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -nocrypt -out internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.pem +``` + +Note: For the encrypted key we're using `-v2 des3` because we only support PKCS#5 v2.0: https://linux.die.net/man/1/pkcs8 diff --git a/internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.p8 b/internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.p8 new file mode 100644 index 0000000000..d6a67b8c1f --- /dev/null +++ b/internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.p8 @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFDjBABgkqhkiG9w0BBQ0wMzAbBgkqhkiG9w0BBQwwDgQIwspexv/RI9YCAggA +MBQGCCqGSIb3DQMHBAgishmyEhSkmgSCBMgp6P0d0KyXCR+KtntmYJ3V+cNUaMX4 +YWXTVijloSBIloDW+TWJPL3qNAXcC5FaZQ/TP4lGfjySnL1UzerShd1iRQZ3Vohn +7MlLDC6CcyNfwsgJP+4ETujniPsDztonMS1T6HNHk3HjL6VqRuxfc4w69hoihQcU +ws3AG2Darcf4r544dzo3jj4gaBsZfvFfPhhV61E2KHKT4/8U/y5GMiHKB1SIs0xO ++t9kyzK0EitQpryVNnFihHVQLTrHiSbDxo7/TcRC4NRIUHYoleyvS3WnrsgzKbJ1 +91m6MUY6yxD578V/KiU0BlmJk8S/gMMou1sVfgKq3MTNNkUlLUHMyJgvPRatDUzN +rcj/wMzCXX6tPsoXSBDJuxp1unJPcHOMArNyUcUCcMTNOgtsnRf1TB6FKmeT+3Lz +fdxnszFjj0VzVyJI68HMSGnU7OVUmUgq0FobbR3KjkXuhSKOHoLMimBGdsv3f0/A +rFC6a2b3k1FAhYf+I5hBPsU4tm3fKzmmL/enxo5byT7MUPCSW7cwVL3zVM8MUXYs +0ZS+QpMRrBJZ8Zg9A9LFyZ7/UwSTiZRXddEzrLy7e8gFcmY2eJEWD3vkhJXD+PeT +VPp5UdQvMvkFgOANQAtXAxiJPN2hWxjv6QWXUe0ljqmJ8wH9NSQYPu6aa1c4Xjax +E+lbV/Yt5l+Fd0lyZCJh7+CAGFKba2FyuzUm/sJ8G66EfatZWmXcddcSK8yB6Hva +RP/tXChWrVmHISXzIuYUfQFVtHT7Imt7kl1oeKYM6jaJmeJcC0Kt9RWfWLWYvc69 +8O2Srx/TgLH/L0P7Ll6TY7gSDjBhfgnuE/GekMGfX6AJMnAgvm0soe7QFBRjr+sL +TFxbFiGk7XocZSxwXemYE/7Z+ir7yjgWs0eS3799gMZ/kXQBWMrI6BnExEkJvopZ +mqoT0ln2/ara4ywZ/gYLLSwcyS8PEMgbTD/XF4qM0H00+YisG2H2mIdEl9w0oGcj +d1rJNlLHPZ2/3e6UN2Yf8WmE4W0GSiVAapfKuDQtVGqMXVXkXbLTdB3X5mP5FvpN +lSFu0KJqyV/fz/ronPbA5xsKy/Ctn368/RvpcbQeqGaAL7QOQ85UVuqtUbNyUEU2 +FLONRIphp54XmXlKHZ4xYsyiNQBFo+B4vG93dbTirSYLgkF0iMWsf722cUAUEZkt +h/gSTrqJN7cPDeZHMLo3uAeW5pjmkwupGR8NfAaOtQYlx7w1rr4s21LohMohI7Un +6vCcYE8P8K9cwEPQOUvyDXJTx3kGbq9EqwOmML2VKB8VrIUuHYoKJ7vMflwc0IU4 +mFjjkk7Iog6q5EWrvmMiPrwRjfStj0z2g+1/itB2j9Yt8G7X7NpchWFFUhpqldYy +tyWIsOB5Upo+jEzusz0i/vA1SY0CFoenK7HeDXIYowuJ4Sahqc9A2eLcKj7znRpw +Pl2Fmd8Lsr6iR7j22OCSxBmIqnhMyYEgN40UETg51X3c1usb3d7EHCNj0Gwd3hUm +Dl2C3/yfni9e7Z4jVm/60NmRQjDKft4AAOmba9wvOad2RLBRs0uMirdrQ3mefSI/ +lsh5wB4vGaNPS9La0mP3/PYuInQeTwJmU+BQlgscZXWwUtIKuVoyBeQRRiuO1/+h +64g= +-----END ENCRYPTED PRIVATE KEY----- diff --git a/internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.pem b/internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.pem new file mode 100644 index 0000000000..c8719654ad --- /dev/null +++ b/internal/impl/snowflake/resources/ssh_keys/snowflake_rsa_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDFzX7C0Bn+k8dI +n9lqZQi0bRt0AY7zPUuZo8beI+YOCgJF0OMDF1nWc++YsYnVo8DKUxLmwAA/Pmzh +O68UOwc04vki2sZ3Ruo4NsjaDKoYIhs3/Q3YzXCogcM3DknDmQhwNn4r05s6b+hq +n+ifeEu6aLVP6BrWHD2IHHEFMzrwrjdiFk3qux4ZRAsP9cCWipQkUce19nQIPjdh +UcZYWNvx+yOVz0x5xaEaKezJkwo8S0nQfsWTKdGXkw9xVjs6hzegYCrHwvJKwN2g +61CVFLt47qkKu0k/ZBIZAXPK1auhQCK3ci1I1aMROUvurjtSAvl0cO1LYgF2Ds7b +bGkz1RfjAgMBAAECggEAabxsy4TksGKcv+S7GxXBLnm4mC2RFdOpSwrybqLwAoc1 +Kc782xUrb+jvpkcZcDul/kGkM/dk6mnbWBdIgt7+/jVqikg6mV4uLDiU64Kjllz9 +AdPjCAbh9yHOkeqwYb+3dAydK55lNzrFGeI7PqvWh2IbsghX+CaGefECNY5qLmdw +QBBpZp7Q8jZb0tEX/w2G56gXLbzLARzhJ8BiXR+exKqJX58jRzu2r8gK7wkgBDyN +ESPczUwmSzTETtkj/19wa4o/4zRQ4Hf6vMQJcPhgLqn5fCX03nhx7/M+vFrbLsB6 ++QwjAJ/pIFZlZKSllQVHw+KBEG0cwQ7+SycM74CIMQKBgQD+0Jy4v8TV/lX8zldG +RB3RKFjh9tdrYSfDlNGRNBWaIaMjsrRe95MDs2aycTOvWsBMCRsjdIrswsBiChaz +mxglOV+m0aDeWP4bfaa+SeBB1U0jF3JmtYsilTBL6+71rp1ufRLQFdUoGEQzCTiQ +MneGQzN+nCXfm6RSfAnBALa2uQKBgQDGuQDLm5FClKHiFPMVjyj8YezRzHM7Q6+g +xXAbyeCuXPUubUFMtOWAH9bI2nzPjFtB15rVchJNdL6wGGIq/29slUR+OMopexjW +hRu0/T5j4oCs57ifRy/iIdaO5o4XC0VxFXRqktEskdMGW2/wFAd+nNMli5huPlMT +4hF+Pm81ewKBgGS6rKlvzWzWjMFSBDgXpz3OWEyDGqctEd4Dz1A6KavzTh1HgHvm +HGyjF57ElyzjkA6+rsa2RFDRr+FRoaXAUqwsYP598bzTqyfM5QRmCcuceVC87RFj +BKxYE25/xsfCDiPmN3CgoNGnvhX6uCxwdsVRfWK4cVRSn4On2uc70/6pAoGAN95h +S9zjvN0+meob4U7LThFV3DHnn5zK7p8zgoyCH2NBBxluR1uAPkI1R2ituEgUi/FK +tYGJhb8xsR5Z0w7XS6a3h+j9ZSYXeJAZlwuvk7NlS7cl35nK639p6+kDv5TKpB1N +Cn1WU3p34oyobs2iwcTjU+XoJ+5buvZOxrhU2asCgYEA/CWGpHmwTaq5UprNLyWh +dDFCAO0oPqXCSrjFrC6YULU7HR3hZoTw5QUkgwhkwNkgNsfRpLeJOqTqAqxhXfml +lphE9P3Q/zIrmyUPLBQr9Dy9gUYAR0WmQJYrD95WPj6dcS1DzSXryMRNst5q3pcx +Ph3+re17s0r+0CGl1Mv3uPw= +-----END PRIVATE KEY----- diff --git a/public/components/all/package.go b/public/components/all/package.go index 55b9b91395..5bd50e0583 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -25,6 +25,7 @@ import ( _ "github.com/benthosdev/benthos/v4/internal/impl/prometheus" _ "github.com/benthosdev/benthos/v4/internal/impl/pulsar" _ "github.com/benthosdev/benthos/v4/internal/impl/redis" + _ "github.com/benthosdev/benthos/v4/internal/impl/snowflake" _ "github.com/benthosdev/benthos/v4/internal/impl/sql" _ "github.com/benthosdev/benthos/v4/internal/impl/statsd" "github.com/benthosdev/benthos/v4/internal/template" diff --git a/website/docs/components/outputs/snowflake_put.md b/website/docs/components/outputs/snowflake_put.md new file mode 100644 index 0000000000..53d46ae9e4 --- /dev/null +++ b/website/docs/components/outputs/snowflake_put.md @@ -0,0 +1,602 @@ +--- +title: snowflake_put +type: output +status: experimental +categories: ["Services"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables. + + + + + + +```yml +# Common config fields, showing default values +output: + label: "" + snowflake_put: + account: "" + region: "" + cloud: "" + user: "" + password: "" + private_key_file: "" + private_key_pass: "" + role: "" + database: "" + warehouse: "" + schema: "" + stage: "" + path: "" + compression: AUTO + snowpipe: "" + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + max_in_flight: 1 +``` + + + + +```yml +# All config fields, showing default values +output: + label: "" + snowflake_put: + account: "" + region: "" + cloud: "" + user: "" + password: "" + private_key_file: "" + private_key_pass: "" + role: "" + database: "" + warehouse: "" + schema: "" + stage: "" + path: "" + upload_parallel_threads: 4 + compression: AUTO + snowpipe: "" + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] + max_in_flight: 1 +``` + + + + +In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described +[here](/docs/configuration/interpolation#bloblang-queries). When using batching, messages are grouped by the calculated +stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe +`insertFiles` REST API call will be made for each individual file. + +### Credentials + +Two authentication mechanisms are supported: +- User/password +- Key Pair Authentication + +#### User/password + +This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with +Snowpipe. + +#### Key Pair Authentication + +This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key +beforehand. Please consult the [documentation](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) +for details on how to set it up and assign the Public Key to your user. + +Note that the Snowflake documentation suggests using this command: + +```shell +openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 +``` + +to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called +`pbeWithMD5AndDES-CBC`, which part of the PKCS#5 v1.5, which is considered insecure. Due to this, Benthos does not +support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them. One way +of achieving this is to use the following command: + +```shell +openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 +``` + +Alternatively, you can re-encrypt an existing key using this command: + +```shell +openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8 +``` + +Please consult this [documentation](https://linux.die.net/man/1/pkcs8) for details. + +### Batching + +It's common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your +messages at the output level and join the batch of messages with an +[`archive`](/docs/components/processors/archive) and/or [`compress`](/docs/components/processors/compress) +processor. + +For the optimal batch size, please consult the Snowflake [documentation](https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html). + +### Snowpipe + +Given a table called `BENTHOS_TBL` with one column of type `variant`: + +```sql +CREATE OR REPLACE TABLE BENTHOS_DB.PUBLIC.BENTHOS_TBL(RECORD variant) +``` + +and the following `BENTHOS_PIPE` Snowpipe: + +```sql +CREATE OR REPLACE PIPE BENTHOS_DB.PUBLIC.BENTHOS_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTHOS_DB.PUBLIC.BENTHOS_TBL FROM (SELECT * FROM @%BENTHOS_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO) +``` + +you can configure Benthos to use the implicit table stage `@%BENTHOS_TBL` as the `stage` and +`BENTHOS_PIPE` as the `snowpipe`. In this case, you must set `compression` to `AUTO` and, if +using message batching, you'll need to configure an [`archive`](/docs/components/processors/archive) processor +with the `concatenate` format. Since the `compression` is set to `AUTO`, the +[gosnowflake](https://github.com/snowflakedb/gosnowflake) client library will compress the messages automatically so you +don't need to add a [`compress`](/docs/components/processors/compress) processor for message batches. + +If you add `STRIP_OUTER_ARRAY = TRUE` in your Snowpipe `FILE_FORMAT` +definition, then you must use `json_array` instead of `concatenate` as the archive processor format. + +Note: Only Snowpipes with `FILE_FORMAT` `TYPE` `JSON` are currently supported. + +### Snowpipe Troubleshooting + +Snowpipe [provides](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html) the `insertReport` +and `loadHistoryScan` REST API endpoints which can be used to get information about recent Snowpipe calls. In +order to query them, you'll first need to generate a valid JWT token for your Snowflake account. There are two methods +for doing so: +- Using the `snowsql` [utility](https://docs.snowflake.com/en/user-guide/snowsql.html): + +```shell +snowsql --private-key-path rsa_key.p8 --generate-jwt -a -u +``` + +- Using the Python `jwt-generator` [utility](https://docs.snowflake.com/en/developer-guide/sql-api/guide.html#using-key-pair-authentication): + +```shell +python3 jwt-generator.py --private_key_file_path=rsa_key.p8 --account= --user= +``` + +Once you successfully generate a JWT token and store it into the `JWT_TOKEN` environment variable, then you can, +for example, query the `insertReport` endpoint using `curl`: + +```shell +curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://.snowflakecomputing.com/v1/data/pipes/../insertReport" +``` + +If you need to pass in a valid `requestId` to any of these Snowpipe REST API endpoints, you can enable debug +logging as described [here](/docs/components/logger/about) and Benthos will print the RequestIDs that it sends to +Snowpipe. They match the name of the file that is placed in the stage. + + +## Performance + +This output benefits from sending multiple messages in flight in parallel for +improved performance. You can tune the max number of in flight messages with the +field `max_in_flight`. + +This output benefits from sending messages as a batch for improved performance. +Batches can be formed at both the input and output level. You can find out more +[in this doc](/docs/configuration/batching). + +## Examples + + + + + +Upload uncompressed messages concatenated into a .json file to a table stage without calling Snowpipe. + +```yaml +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: NONE + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate +``` + + + + +Upload messages concatenated into a JSON file to a stage and automatically compressed into a .gz file to a table stage without calling Snowpipe. + +```yaml +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: AUTO + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate +``` + + + + +Upload messages as a compressed .deflate archive of documents to a table stage and call Snowpipe to load it into a table. + +```yaml +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: DEFLATE + snowpipe: BENTHOS_PIPE + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate + - compress: + algorithm: zlib +``` + + + + +Upload messages as a compressed .rawdeflate archive of documents to a table stage and call Snowpipe to load it into a table. + +```yaml +output: + snowflake_put: + account: benthos + user: test@benthos.dev + private_key_file: path_to_ssh_key.pem + private_key_pass: test + role: ACCOUNTADMIN + database: BENTHOS_DB + warehouse: COMPUTE_WH + schema: PUBLIC + path: benthos + stage: "@%BENTHOS_TBL" + upload_parallel_threads: 4 + compression: RAW_DEFLATE + snowpipe: BENTHOS_PIPE + batching: + count: 10 + period: 3s + processors: + - archive: + format: concatenate + - compress: + algorithm: flate +``` + + + + +## Fields + +### `account` + +Account name, which is the same as the Account Identifier +as described [here](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#where-are-account-identifiers-used). +However, when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier), +the Account Identifier is formatted as `..` and this field needs to be +populated using the `` part. + + +Type: `string` + +### `region` + +Optional region field which needs to be populated when using +an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier) +and it must be set to the `` part of the Account Identifier +(`..`). + + +Type: `string` + +```yml +# Examples + +region: us-west-2 +``` + +### `cloud` + +Optional cloud platform field which needs to be populated +when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier) +and it must be set to the `` part of the Account Identifier +(`..`). + + +Type: `string` + +```yml +# Examples + +cloud: aws + +cloud: gcp + +cloud: azure +``` + +### `user` + +Username. + + +Type: `string` + +### `password` + +An optional password. + + +Type: `string` + +### `private_key_file` + +The path to a file containing the private SSH key. + + +Type: `string` + +### `private_key_pass` + +An optional private SSH key passphrase. + + +Type: `string` + +### `role` + +Role. + + +Type: `string` + +### `database` + +Database. + + +Type: `string` + +### `warehouse` + +Warehouse. + + +Type: `string` + +### `schema` + +Schema. + + +Type: `string` + +### `stage` + +Stage name. Use either one of the +[supported](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html) stage types. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` + +### `path` + +Stage path. + + +Type: `string` + +### `upload_parallel_threads` + +Specifies the number of threads to use for uploading files. + + +Type: `int` +Default: `4` + +### `compression` + +Compression type. + + +Type: `string` +Default: `"AUTO"` + +| Option | Summary | +|---|---| +| `AUTO` | Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON. | +| `DEFLATE` | Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950). | +| `GZIP` | Messages must be pre-compressed using the gzip algorithm. | +| `NONE` | No compression is applied and messages must contain plain-text JSON. | +| `RAW_DEFLATE` | Messages must be pre-compressed using the flate algorithm (without header, RFC1951). | + + +### `snowpipe` + +An optional Snowpipe name. Use the `` part from `..`. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` + +### `batching` + +Allows you to configure a [batching policy](/docs/configuration/batching). + + +Type: `object` + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +### `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +Type: `int` +Default: `0` + +### `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +Type: `int` +Default: `0` + +### `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +Type: `string` +Default: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +### `batching.check` + +A [Bloblang query](/docs/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch. + + +Type: `string` +Default: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +### `batching.processors` + +A list of [processors](/docs/components/processors/about) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +Type: `array` + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + +### `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + + +Type: `int` +Default: `1` + +