Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 139 additions & 72 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ func (c *Client) Do(req *Request, resp *Response) error {
c.mLock.Unlock()

if startCleaner {
go c.mCleaner(m)
// Register client to cleaner
cCleaner.register(c)
}

return hc.Do(req, resp)
Expand All @@ -539,37 +540,34 @@ func (c *Client) CloseIdleConnections() {
c.mLock.Unlock()
}

func (c *Client) mCleaner(m map[string]*HostClient) {
mustStop := false

sleep := c.MaxIdleConnDuration
if sleep < time.Second {
sleep = time.Second
} else if sleep > 10*time.Second {
sleep = 10 * time.Second
// Clean HostClient in a map when the HostClient has no more connections.
func (c *Client) cleanHostClients(m map[string]*HostClient) {
c.mLock.Lock()
for k, v := range m {
// Lock HostClient, and delete it from map m where it has no connections.
v.connsLock.Lock()
if v.connsCount == 0 {
delete(m, k)
}
v.connsLock.Unlock()
}
c.mLock.Unlock()
}

for {
c.mLock.Lock()
for k, v := range m {
v.connsLock.Lock()
shouldRemove := v.connsCount == 0
v.connsLock.Unlock()

if shouldRemove {
delete(m, k)
}
}
if len(m) == 0 {
mustStop = true
}
c.mLock.Unlock()
// Clean HostClient for Client.
func (c *Client) cleanResource() {
c.cleanHostClients(c.m)
c.cleanHostClients(c.ms)
}

if mustStop {
break
}
time.Sleep(sleep)
// Whether a Client has any HostClient.
func (c *Client) hasResource() bool {
c.mLock.Lock()
defer c.mLock.Unlock()
if len(c.m) > 0 || len(c.ms) > 0 {
return true
}
return false
}

// DefaultMaxConnsPerHost is the maximum number of concurrent connections
Expand Down Expand Up @@ -1522,9 +1520,9 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool)
if c.connsCount < maxConns {
c.connsCount++
createConn = true
if !c.connsCleanerRun && !connectionClose {
// first time to create connection, need to start cleaner
if c.connsCount == 1 {
startCleaner = true
c.connsCleanerRun = true
}
}
} else {
Expand Down Expand Up @@ -1583,7 +1581,8 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool)
}

if startCleaner {
go c.connsCleaner()
// Register to cleaner.
cCleaner.register(c)
}

conn, err := c.dialHostHard()
Expand Down Expand Up @@ -1648,52 +1647,48 @@ func (c *HostClient) connsCleaner() {
if maxIdleConnDuration <= 0 {
maxIdleConnDuration = DefaultMaxIdleConnDuration
}
for {
currentTime := time.Now()

// Determine idle connections to be closed.
c.connsLock.Lock()
conns := c.conns
n := len(conns)
i := 0
for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
i++
}
sleepFor := maxIdleConnDuration
if i < n {
// + 1 so we actually sleep past the expiration time and not up to it.
// Otherwise the > check above would still fail.
sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
}
scratch = append(scratch[:0], conns[:i]...)
if i > 0 {
m := copy(conns, conns[i:])
for i = m; i < n; i++ {
conns[i] = nil
}
c.conns = conns[:m]
}
c.connsLock.Unlock()

// Close idle connections.
for i, cc := range scratch {
c.closeConn(cc)
scratch[i] = nil
}
currentTime := time.Now()

// Determine whether to stop the connsCleaner.
c.connsLock.Lock()
mustStop := c.connsCount == 0
if mustStop {
c.connsCleanerRun = false
}
// Determine idle connections to be closed.
c.connsLock.Lock()
conns := c.conns
n := len(conns)
i := 0
for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
i++
}
// If no connections need to be closed, unlock and finish.
if i <= 0 {
c.connsLock.Unlock()
if mustStop {
break
}
return
}

time.Sleep(sleepFor)
// Connections to be closed.
scratch = append(scratch[:0], conns[:i]...)
m := copy(conns, conns[i:])
for i = m; i < n; i++ {
conns[i] = nil
}
// Connections to be used.
c.conns = conns[:m]
c.connsLock.Unlock()

// Close idle connections.
for i, cc := range scratch {
c.closeConn(cc)
scratch[i] = nil
}
}

// clean connection resources for HostClient.
func (c *HostClient) cleanResource() {
c.connsCleaner()
}

func (c *HostClient) hasResource() bool {
cnt := c.ConnsCount()
return cnt > 0
}

func (c *HostClient) closeConn(cc *clientConn) {
Expand Down Expand Up @@ -2907,3 +2902,75 @@ func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
w.err = nil
pool.Put(w)
}

// Resource Clean interface.
// resourceClean can register into clientCleaner for cleaning resources.
type resourceClean interface {
cleanResource()
hasResource() bool
}

// Cleaner to clean resources.
// Client, HostClient register into cleaner. The Cleaner will check and delete client
// where it has no resources.
type clientCleaner struct {
initOnce sync.Once
// clients to clean
clients sync.Map
}

// Global cleaner.
var cCleaner = &clientCleaner{}

// Initialize cleaner.
func (c *clientCleaner) init() {
c.initOnce.Do(func() {
go c.cleaner()
})
}

// Clean clients
func (c *clientCleaner) cleaner() {
for {
c.cleanClient()
time.Sleep(10 * time.Second)
}
}

// Register Client to cleaner.
func (c *clientCleaner) register(client resourceClean) {
if client == nil {
return
}
// Check if init
c.init()

// Store client
c.clients.Store(client, struct{}{})
}

// Clean Client when the client has no resource.
func (c *clientCleaner) cleanClient() {
// Find all clients
allClients := make([]resourceClean, 0)
c.clients.Range(func (key, value interface{}) bool {
client := key.(resourceClean)
allClients = append(allClients, client)
return true
})

// Clean connections for each client.
for _, client := range allClients {
//Clean resource.
client.cleanResource()
// If a client has no resource, delete it from c.clients.
// But client may create resource before deleted,
// so check again. If it has any resource, store it into c.clients.
if !client.hasResource() {
c.clients.Delete(client)
if client.hasResource() {
c.clients.Store(client, struct{}{})
}
}
}
}
65 changes: 65 additions & 0 deletions client_cleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package fasthttp

import (
"testing"
)

type cleanItem struct {
used int
free int
}

func (ci *cleanItem) cleanResource() {
ci.free = 0
}

func (ci *cleanItem) hasResource() bool {
total := ci.used + ci.free
return total > 0
}

func TestClientCleaner(t *testing.T) {
c1 := &cleanItem{used: 1, free: 0}
c2 := &cleanItem{used: 2, free: 0}

exists := func(ci *cleanItem) bool {
var item resourceClean = ci
_, ok := cCleaner.clients.Load(item)
return ok
}

// test register
cCleaner.register(c1)
cCleaner.register(c2)
if !exists(c1) {
t.Errorf("clientCleaner error, item register but not exist.")
}
if !exists(c2) {
t.Errorf("clientCleaner error, item register but not exist.")
}

// test clean
c1.used = 0
cCleaner.cleanClient()
if exists(c1) {
t.Errorf("clientCleaner error, item has no resource but not deleted")
}

// test duplicate register
c1.used, c1.free = 0, 3
cCleaner.register(c1)
cCleaner.register(c1)
if !exists(c1) {
t.Errorf("clientCleaner error, item register but not exist.")
}

// test clean and delete
cCleaner.cleanClient()
if exists(c1) {
t.Errorf("clientCleaner error, item has no resource but not deleted")
}

if !exists(c2) {
t.Errorf("clientCleaner error, item register but not exist.")
}
}
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2815,4 +2815,4 @@ func TestHostClientMaxConnWaitTimeoutWithEarlierDeadline(t *testing.T) {
if emptyBodyCount > 0 {
t.Fatalf("at least one request body was empty")
}
}
}