Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
13a5c7a
init
souravgupta-msft Dec 12, 2025
b918ea4
changes
souravgupta-msft Dec 12, 2025
3dec1f4
update bandwidth limiter
souravgupta-msft Dec 15, 2025
b49d955
move to utils
souravgupta-msft Dec 15, 2025
2392e77
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Dec 16, 2025
1def7ac
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Dec 17, 2025
9de38fe
Update burst size logic
souravgupta-msft Dec 17, 2025
19718c7
Update burst size logic
souravgupta-msft Dec 17, 2025
547a113
changes
souravgupta-msft Dec 26, 2025
c831401
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Dec 26, 2025
b05cc1e
adding unit tests
souravgupta-msft Dec 26, 2025
47f4166
unit tests
souravgupta-msft Dec 26, 2025
069f485
changes
souravgupta-msft Jan 6, 2026
a0cf537
changes
souravgupta-msft Jan 6, 2026
218ba45
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Jan 7, 2026
0254550
adding changelog
souravgupta-msft Jan 7, 2026
12e2565
changes
souravgupta-msft Jan 8, 2026
e29d531
Adding ut for non-get methods
souravgupta-msft Jan 8, 2026
7b36e1e
changes
souravgupta-msft Jan 8, 2026
44706de
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Jan 9, 2026
ac24614
review comment
souravgupta-msft Jan 9, 2026
9173168
sync with main
souravgupta-msft Jan 12, 2026
2c25d95
sync with main
souravgupta-msft Jan 14, 2026
ac578e6
sync with main
souravgupta-msft Jan 21, 2026
29c0209
Merge branch 'main' into sourav/rateLimit
vibhansa-msft Jan 27, 2026
384d11b
update flags description
souravgupta-msft Jan 28, 2026
a4c5ad6
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Feb 3, 2026
25ee312
cap-mbps-read
souravgupta-msft Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 2.5.3 (Unreleased)
**Features**
- Add rate limit functionality for ingress bandwidth (bytes downloaded per second) and operations per second ([PR #2093](https://github.com/Azure/azure-storage-fuse/pull/2093))

**Bug Fixes**

Expand Down
41 changes: 41 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -4559,4 +4559,45 @@ Apache License







****************************************************************************

============================================================================
>>> golang.org/x/time
==============================================================================

Copyright 2009 The Go Authors.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google LLC nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.




--------------------- END OF THIRD PARTY NOTICE --------------------------------
6 changes: 6 additions & 0 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,12 @@ func init() {
blobFilter := config.AddStringFlag("filter", "", "Filter string to match blobs. For details refer [https://github.com/Azure/azure-storage-fuse?tab=readme-ov-file#blob-filter]")
config.BindPFlag(compName+".filter", blobFilter)

capMbpsRead := config.AddInt64Flag("cap-mbps-read", -1, "Limit the throughput of downloads from your storage account. Value measured in megabits per second. Default is -1 (no limit)")
config.BindPFlag(compName+".cap-mbps-read", capMbpsRead)

capIOps := config.AddInt64Flag("cap-iops", -1, "Limit the total storage operations per second. Default is -1 (no limit)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is cap-iops also related only to read ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it applies to all REST calls

config.BindPFlag(compName+".cap-iops", capIOps)

config.RegisterFlagCompletionFunc("container-name", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
})
Expand Down
17 changes: 16 additions & 1 deletion component/azstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ type AzStorageOptions struct {
PreserveACL bool `config:"preserve-acl" yaml:"preserve-acl"`
Filter string `config:"filter" yaml:"filter"`
UserAssertion string `config:"user-assertion" yaml:"user-assertions"`
CapMbpsRead int64 `config:"cap-mbps-read" yaml:"cap-mbps-read"`
CapIOps int64 `config:"cap-iops" yaml:"cap-iops"`

// v1 support
UseAdls bool `config:"use-adls" yaml:"-"`
Expand Down Expand Up @@ -537,7 +539,8 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error {
log.Crit("ParseAndValidateConfig : Retry Config: retry-count %d, max-timeout %d, backoff-time %d, max-delay %d, preserve-acl: %v",
az.stConfig.maxRetries, az.stConfig.maxTimeout, az.stConfig.backoffTime, az.stConfig.maxRetryDelay, az.stConfig.preserveACL)

log.Crit("ParseAndValidateConfig : Telemetry : %s, honour-ACL %v", az.stConfig.telemetry, az.stConfig.honourACL)
log.Crit("ParseAndValidateConfig : Telemetry : %s, honour-ACL %v, cap-mbps-read %d, cap-iops %d",
az.stConfig.telemetry, az.stConfig.honourACL, az.stConfig.capMbpsRead, az.stConfig.capIOps)

return nil
}
Expand Down Expand Up @@ -630,5 +633,17 @@ func ParseAndReadDynamicConfig(az *AzStorage, opt AzStorageOptions, reload bool)
}
}

// Rate limiting, default is no limit
az.stConfig.capMbpsRead = -1
az.stConfig.capIOps = -1

if opt.CapMbpsRead > 0 {
az.stConfig.capMbpsRead = opt.CapMbpsRead
}

if opt.CapIOps > 0 {
az.stConfig.capIOps = opt.CapIOps
}

return nil
}
34 changes: 34 additions & 0 deletions component/azstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,40 @@ func (s *configTestSuite) TestSASRefresh() {
assert.NoError(err)
}

func (s *configTestSuite) TestRateLimitConfig() {
defer config.ResetConfig()
assert := assert.New(s.T())
az := &AzStorage{}
opt := AzStorageOptions{}
opt.AccountName = "abcd"
opt.Container = "abcd"
opt.AuthMode = "key"
opt.AccountKey = "abcd"

// Test default values (no limit)
err := ParseAndReadDynamicConfig(az, opt, false)
assert.NoError(err)
assert.Equal(int64(-1), az.stConfig.capMbpsRead)
assert.Equal(int64(-1), az.stConfig.capIOps)

// Test setting limits
opt.CapMbpsRead = 100
opt.CapIOps = 10
err = ParseAndReadDynamicConfig(az, opt, false)
assert.NoError(err)
assert.Equal(int64(100), az.stConfig.capMbpsRead)
assert.Equal(int64(10), az.stConfig.capIOps)

// Test setting only one limit
opt.CapMbpsRead = 200
opt.CapIOps = 0 // reset to no limit

err = ParseAndReadDynamicConfig(az, opt, false)
assert.NoError(err)
assert.Equal(int64(200), az.stConfig.capMbpsRead)
assert.Equal(int64(-1), az.stConfig.capIOps)
}

func TestConfigTestSuite(t *testing.T) {
suite.Run(t, new(configTestSuite))
}
4 changes: 4 additions & 0 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type AzStorageConfig struct {

// Blob filters
filter *blobfilter.BlobFilter

// Rate limiting
capMbpsRead int64
capIOps int64
}

type AzStorageConnection struct {
Expand Down
103 changes: 103 additions & 0 deletions component/azstorage/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ package azstorage

import (
"fmt"
"math"
"net/http"
"strings"

"golang.org/x/time/rate"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
)

// blobfuseTelemetryPolicy is a custom pipeline policy to prepend the blobfuse user agent string to the one coming from SDK.
Expand Down Expand Up @@ -79,3 +84,101 @@ func (r *serviceVersionPolicy) Do(req *policy.Request) (*http.Response, error) {
req.Raw().Header["x-ms-version"] = []string{r.serviceApiVersion}
return req.Next()
}

// ---------------------------------------------------------------------------------------------------------------------------------------------------
// Policy to limit the rate of requests
type rateLimitingPolicy struct {
ingressBandwidthLimiter *rate.Limiter
opsLimiter *rate.Limiter
}

func newRateLimitingPolicy(readBytesPerSec int64, opsPerSec int64) policy.Policy {
p := &rateLimitingPolicy{}

// Use 10 second window for burst size calculation for rate limiter.
// This allows for short bursts while still enforcing the average rate over a reasonable time period.
// A larger window size would allow larger bursts but would be less effective at limiting short term spikes.
// A smaller window size would limit bursts more but could lead to underutilization of available bandwidth/ops.
// 10 seconds is a reasonable compromise between these factors.
// Note: The burst size is the maximum number of tokens that can be accumulated in the bucket.
// Therefore, a larger burst size allows for larger bursts of traffic,
// but does not affect the average rate over time.
// Users can adjust the bytesPerSec and opsPerSec values to fine-tune the rate limiting behavior as needed.
// For example, setting a higher bytesPerSec value will allow for higher average bandwidth,
// while setting a lower opsPerSec value will limit the number of operations per second.
const windowSize = 10

if readBytesPerSec > 0 {
ingressBandwidthBurstSize := readBytesPerSec * int64(windowSize)
burst := int(ingressBandwidthBurstSize)
// On 32-bit systems, int is 32-bit. If bandwidthBurstSize > MaxInt, we need to clamp it.
// math.MaxInt is platform dependent.
if ingressBandwidthBurstSize > int64(math.MaxInt) {
burst = math.MaxInt
}

p.ingressBandwidthLimiter = rate.NewLimiter(rate.Limit(readBytesPerSec), burst)
log.Info("RateLimitingPolicy : Bandwidth limit set to %d bytes/sec with burst size of %d bytes",
readBytesPerSec, burst)
}

if opsPerSec > 0 {
opsBurstSize := opsPerSec * int64(windowSize)
burst := int(opsBurstSize)
if opsBurstSize > int64(math.MaxInt) {
burst = math.MaxInt
}

p.opsLimiter = rate.NewLimiter(rate.Limit(opsPerSec), burst)
log.Info("RateLimitingPolicy : Ops limit set to %d ops/sec with burst size of %d ops",
opsPerSec, burst)
}

return p
}

func (p *rateLimitingPolicy) Do(req *policy.Request) (*http.Response, error) {
ctx := req.Raw().Context()

// Limit operations per second
if p.opsLimiter != nil {
// Wait for 1 token
err := p.opsLimiter.Wait(ctx)
if err != nil {
log.Err("RateLimitingPolicy : Ops limit wait failed [%s]", err.Error())
return nil, err
}
}

// Limit ingress bandwidth for blob downloads (Azure egress: data leaving Azure Storage).
// This policy intentionally applies only to GET requests, which represent download operations.
if p.ingressBandwidthLimiter != nil && req.Raw().Method == http.MethodGet {
// Check for x-ms-range header
// We are not using req.Raw().Header.Get() as it canonicalizes the header name.
// Whereas SDK stores the header in the request is stored in lower case.
// So we directly access the header map with lower case key.
// NOTE: using strings.ToLower to ignore the lint error regarding canonicalized headers.
rangeHeader := req.Raw().Header[strings.ToLower(X_Ms_Range)]
if len(rangeHeader) == 0 {
rangeHeader = req.Raw().Header[RangeHeader]
}

if len(rangeHeader) > 0 {
size, err := parseRangeHeader(rangeHeader[0])
if err == nil && size > 0 {
// Wait for tokens equal to size.
// NOTE: range size is guaranteed to be within int range by parseRangeHeader.
err := p.ingressBandwidthLimiter.WaitN(ctx, int(size))
if err != nil {
log.Err("RateLimitingPolicy : Bandwidth limit wait failed [%s]", err.Error())
return nil, err
}
} else if err != nil {
log.Err("RateLimitingPolicy : Failed to parse Range header %s: [%s]", rangeHeader[0], err.Error())
return nil, err
}
}
}

return req.Next()
}
Loading
Loading