Skip to content
Open
276 changes: 247 additions & 29 deletions io/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package io

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"slices"
"strconv"
"strings"
"time"

"github.com/apache/iceberg-go/utils"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -47,54 +52,89 @@ const (
S3ProxyURI = "s3.proxy-uri"
S3ConnectTimeout = "s3.connect-timeout"
S3SignerUri = "s3.signer.uri"
S3SignerAuthToken = "token"
S3RemoteSigningEnabled = "s3.remote-signing-enabled"
S3ForceVirtualAddressing = "s3.force-virtual-addressing"
)

var unsupportedS3Props = []string{
S3ConnectTimeout,
S3SignerUri,
}

// ParseAWSConfig parses S3 properties and returns a configuration.
func ParseAWSConfig(ctx context.Context, props map[string]string) (*aws.Config, error) {
// If any unsupported properties are set, return an error.
for k := range props {
if slices.Contains(unsupportedS3Props, k) {
return nil, fmt.Errorf("unsupported S3 property %q", k)
}
}

opts := []func(*config.LoadOptions) error{}

if tok, ok := props["token"]; ok {
opts = append(opts, config.WithBearerAuthTokenProvider(
&bearer.StaticTokenProvider{Token: bearer.Token{Value: tok}}))
}

if region, ok := props[S3Region]; ok {
region := ""
if r, ok := props[S3Region]; ok {
region = r
opts = append(opts, config.WithRegion(region))
} else if region, ok := props["client.region"]; ok {
} else if r, ok := props["client.region"]; ok {
region = r
opts = append(opts, config.WithRegion(region))
}

accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey]
token := props[S3SessionToken]
if accessKey != "" || secretAccessKey != "" || token != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken])))
// Check if remote signing is configured and enabled
signerURI, hasSignerURI := props[S3SignerUri]
remoteSigningEnabled := true // Default to true for backward compatibility
if enabledStr, ok := props[S3RemoteSigningEnabled]; ok {
if enabled, err := strconv.ParseBool(enabledStr); err == nil {
remoteSigningEnabled = enabled
}
}

if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
if hasSignerURI && signerURI != "" && remoteSigningEnabled {
// For remote signing, we still need valid (but potentially dummy) credentials
// The actual signing will be handled by the transport layer
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
"remote-signer", "remote-signer", "")))

// Create a custom HTTP client with remote signing transport
baseTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
// Apply proxy if configured
if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}
baseTransport.Proxy = http.ProxyURL(proxyURL)
}

// Get auth token if configured
authToken := props[S3SignerAuthToken]
timeoutStr := props[S3ConnectTimeout]

remoteSigningTransport := newRemoteSigningTransport(baseTransport, signerURI, region, authToken, timeoutStr)
httpClient := &http.Client{
Transport: remoteSigningTransport,
}

opts = append(opts, config.WithHTTPClient(httpClient))
} else {
// Use regular credentials if no remote signer
accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey]
token := props[S3SessionToken]
if accessKey != "" || secretAccessKey != "" || token != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken])))
}

if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
}
}

awscfg := new(aws.Config)
Expand Down Expand Up @@ -149,3 +189,181 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, props map[string]strin

return bucket, nil
}

// RemoteSigningRequest represents the request sent to the remote signer
type RemoteSigningRequest struct {
Method string `json:"method"`
URI string `json:"uri"`
Headers map[string]string `json:"headers,omitempty"`
Region string `json:"region"`
}

// RemoteSigningResponse represents the response from the remote signer
type RemoteSigningResponse struct {
Headers map[string]string `json:"headers"`
}

// remoteSigningTransport wraps an HTTP transport to handle remote signing
type remoteSigningTransport struct {
base http.RoundTripper
signerURI string
region string
authToken string
client *http.Client
}

// newRemoteSigningTransport creates a new remote signing transport
func newRemoteSigningTransport(base http.RoundTripper, signerURI, region, authToken, timeoutStr string) *remoteSigningTransport {

timeout := 30 // default timeout in seconds
if t, err := strconv.Atoi(timeoutStr); timeoutStr != "" && err == nil {
timeout = t
}

return &remoteSigningTransport{
base: base,
signerURI: signerURI,
region: region,
authToken: authToken,
client: &http.Client{
Timeout: time.Duration(timeout) * time.Second,
},
}
}

// RoundTrip implements http.RoundTripper
func (r *remoteSigningTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// Only handle S3 requests
if !r.isS3Request(req) {
return r.base.RoundTrip(req)
}

// Get signed headers from remote signer
signedHeaders, err := r.getRemoteSignature(req.Context(), req.Method, req.URL.String(), r.extractHeaders(req))
if err != nil {
log.Printf("ERROR: Failed to get remote signature: %v", err) // fails silently
return nil, fmt.Errorf("failed to get remote signature: %w", err)
}

// Clone the request and apply signed headers
newReq := req.Clone(req.Context())
for key, value := range signedHeaders {
newReq.Header.Set(key, value)
}

return r.base.RoundTrip(newReq)
}

// isS3Request checks if the request is destined for S3
func (r *remoteSigningTransport) isS3Request(req *http.Request) bool {
// Check if the host contains typical S3 patterns
host := req.URL.Host

// Don't sign requests to the remote signer itself to avoid circular dependency
if r.signerURI != "" {
signerHost := ""
if signerURL, err := url.Parse(r.signerURI); err == nil {
signerHost = signerURL.Host
}
if host == signerHost {
return false
}
}

result := host != "" && (
// Standard S3 endpoints
host == "s3.amazonaws.com" ||
// Regional S3 endpoints
(len(host) > 12 && host[len(host)-12:] == ".amazonaws.com" && (host[:3] == "s3." || host[len(host)-17:len(host)-12] == ".s3")) ||
// Virtual hosted-style bucket access
(len(host) > 17 && host[len(host)-17:] == ".s3.amazonaws.com") ||
// Path-style access to S3
(len(host) > 3 && host[:3] == "s3.") ||
// Cloudflare R2 endpoints
(len(host) > 20 && host[len(host)-20:] == ".r2.cloudflarestorage.com") ||
// MinIO or other custom S3-compatible endpoints (be more conservative)
(len(host) > 0 && (host == "localhost:9000" || host == "127.0.0.1:9000" ||
// Only sign if it looks like an S3 request pattern (has bucket-like structure)
// and is NOT a catalog service (which typically has /catalog/ in the path)
(req.URL.Path != "" && !strings.Contains(req.URL.Path, "/catalog/") &&
!strings.Contains(host, "catalog") &&
// Exclude common non-S3 service patterns
!strings.Contains(host, "glue.") &&
!strings.Contains(host, "api.") &&
!strings.Contains(host, "catalog.")))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would put this under a separate helper function. wdyt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it could be put in the utils folder? Although I don't think this logic will be used elsewhere, so not sure that there is much benefit breaking the function into pieces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we put it in io/utils.go. Wdyt @zeroshade ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utils folder are generally bad practice, i would even keep this function in this file, or as @dttung2905 proposed in io/utils.go file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote to keep it in this file since there isn't any reusability. But I'm fine moving it as well. Who makes the final call ? 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flarco it's up to you, same file - it's good enough.


return result
}

// extractHeaders extracts relevant headers from the request
func (r *remoteSigningTransport) extractHeaders(req *http.Request) map[string]string {
headers := make(map[string]string)
for key, values := range req.Header {
if len(values) > 0 {
headers[key] = values[0]
}
}
return headers
}

// getRemoteSignature sends a request to the remote signer and returns signed headers
func (r *remoteSigningTransport) getRemoteSignature(ctx context.Context, method, uri string, headers map[string]string) (map[string]string, error) {
reqBody := RemoteSigningRequest{
Method: method,
URI: uri,
Headers: headers,
Region: r.region,
}

payload, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal signing request: %w", err)
}

req, err := http.NewRequestWithContext(ctx, "POST", r.signerURI, bytes.NewReader(payload))
if err != nil {
return nil, fmt.Errorf("failed to create signer request to %s: %w", r.signerURI, err)
}

req.Header.Set("Content-Type", "application/json")

// Add authentication token if configured
if r.authToken != "" {
req.Header.Set("Authorization", "Bearer "+r.authToken)
}

resp, err := r.client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to contact remote signer at %s: %w", r.signerURI, err)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
// Read the response body for better error diagnostics
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return nil, fmt.Errorf("remote signer at %s returned status %d (failed to read response body: %v)", r.signerURI, resp.StatusCode, readErr)
}

// Provide detailed error information based on status code
switch resp.StatusCode {
case 401:
return nil, fmt.Errorf("remote signer authentication failed (401) at %s: %s", r.signerURI, string(body))
case 403:
return nil, fmt.Errorf("remote signer authorization denied (403) at %s: %s. Check that the signer service has proper AWS credentials and permissions for the target resource. Request was: %s", r.signerURI, string(body), string(payload))
case 404:
return nil, fmt.Errorf("remote signer endpoint not found (404) at %s: %s. Check the signer URI configuration", r.signerURI, string(body))
case 500:
return nil, fmt.Errorf("remote signer internal error (500) at %s: %s", r.signerURI, string(body))
default:
return nil, fmt.Errorf("remote signer at %s returned status %d: %s", r.signerURI, resp.StatusCode, string(body))
}
}

var signingResponse RemoteSigningResponse
if err := json.NewDecoder(resp.Body).Decode(&signingResponse); err != nil {
return nil, fmt.Errorf("failed to decode signer response from %s: %w", r.signerURI, err)
}

return signingResponse.Headers, nil
}
Loading