-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchannel_manager.go
More file actions
103 lines (87 loc) · 2.32 KB
/
channel_manager.go
File metadata and controls
103 lines (87 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package sockety
import (
"sync/atomic"
)
type ChannelID uint16
type channelManager struct {
// Configuration
channelsCount uint16
// State
current ChannelID
locks []uint32
released chan ChannelID
}
type ChannelManager interface {
Current() ChannelID // not thread-safe
Is(id ChannelID) bool // not thread-safe
SetCurrent(channel ChannelID) // not thread-safe
GetFree() ChannelID // not thread-safe
Reserve() ChannelID
Release(channel ChannelID)
}
func NewChannelManager(channelsCount uint16) ChannelManager {
if err := validateChannelsCount(channelsCount); err != nil {
panic(err)
}
return &channelManager{
channelsCount: channelsCount,
locks: make([]uint32, channelsCount),
}
}
func (c *channelManager) obtainLock(channel uint16) bool {
return atomic.CompareAndSwapUint32(&c.locks[channel], 0, 1)
}
func (c *channelManager) isTemporarilyFree(channel uint16) bool {
return c.locks[channel] == 0
}
func (c *channelManager) Current() ChannelID {
return c.current
}
func (c *channelManager) Is(channel ChannelID) bool {
return c.current == channel
}
func (c *channelManager) SetCurrent(channel ChannelID) {
if c.current != channel {
c.current = channel
}
}
func (c *channelManager) Reserve() ChannelID {
// Try to obtain the channel that is free at the moment
for i := uint16(0); i < c.channelsCount; i++ {
channel := (uint16(c.current) + i) % c.channelsCount
if c.obtainLock(channel) {
return ChannelID(channel)
}
}
// Otherwise, wait until some channel will be released and try to reserve it
for {
channel := uint16(<-c.released)
if c.obtainLock(channel) {
return ChannelID(channel)
}
}
}
// TODO: Consider if this fast path is actually helpful
func (c *channelManager) GetFree() ChannelID {
// Try to obtain the channel that is free at the moment
for i := uint16(0); i < c.channelsCount; i++ {
channel := (uint16(c.current) + i) % c.channelsCount
if c.isTemporarilyFree(channel) {
return ChannelID(channel)
}
}
// Otherwise, wait until some channel will be released and try to reserve it
for {
channel := uint16(<-c.released)
if c.isTemporarilyFree(channel) {
return ChannelID(channel)
}
}
}
func (c *channelManager) Release(channel ChannelID) {
atomic.StoreUint32(&c.locks[channel], 0)
select {
case c.released <- channel:
default:
}
}