Skip to content

Commit 98e83f1

Browse files
committed
refactor: replace mutex with RWMutex for NodeMap access and implement snapshot functionality #1444
1 parent aee2352 commit 98e83f1

File tree

4 files changed

+205
-24
lines changed

4 files changed

+205
-24
lines changed

api/analytic/nodes.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ func GetNodesAnalytic(c *gin.Context) {
107107
defer ws.Close()
108108

109109
for {
110-
// Send NodeMap data to client
111-
err = ws.WriteJSON(analytic.NodeMap)
110+
// Send snapshot of NodeMap data to client to avoid concurrent access
111+
nodeSnapshot := analytic.SnapshotNodeMap()
112+
err = ws.WriteJSON(nodeSnapshot)
112113
if err != nil {
113114
if helper.IsUnexpectedWebsocketError(err) {
114115
logger.Error(err)

internal/analytic/node.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type Node struct {
4343
NodeInfo
4444
}
4545

46-
var mutex sync.Mutex
46+
var nodeMapMu sync.RWMutex
4747

4848
type TNodeMap map[uint64]*Node
4949

@@ -53,6 +53,46 @@ func init() {
5353
NodeMap = make(TNodeMap)
5454
}
5555

56+
func cloneNode(n *Node) *Node {
57+
if n == nil {
58+
return nil
59+
}
60+
61+
cloned := *n
62+
63+
if n.Node != nil {
64+
nodeCopy := *n.Node
65+
cloned.Node = &nodeCopy
66+
}
67+
68+
if n.UpstreamStatusMap != nil {
69+
upstreams := make(map[string]*upstream.Status, len(n.UpstreamStatusMap))
70+
for key, status := range n.UpstreamStatusMap {
71+
if status == nil {
72+
upstreams[key] = nil
73+
continue
74+
}
75+
statusCopy := *status
76+
upstreams[key] = &statusCopy
77+
}
78+
cloned.UpstreamStatusMap = upstreams
79+
}
80+
81+
return &cloned
82+
}
83+
84+
func SnapshotNodeMap() TNodeMap {
85+
nodeMapMu.RLock()
86+
defer nodeMapMu.RUnlock()
87+
88+
snapshot := make(TNodeMap, len(NodeMap))
89+
for id, node := range NodeMap {
90+
snapshot[id] = cloneNode(node)
91+
}
92+
93+
return snapshot
94+
}
95+
5696
func GetNode(node *model.Node) (n *Node) {
5797
if node == nil {
5898
// this should never happen
@@ -64,12 +104,23 @@ func GetNode(node *model.Node) (n *Node) {
64104
Node: node,
65105
}
66106
}
67-
n, ok := NodeMap[node.ID]
68-
if !ok {
69-
n = &Node{}
107+
nodeMapMu.RLock()
108+
cached, ok := NodeMap[node.ID]
109+
nodeMapMu.RUnlock()
110+
if !ok || cached == nil {
111+
return &Node{
112+
Node: node,
113+
}
114+
}
115+
116+
cloned := cloneNode(cached)
117+
if cloned == nil {
118+
return &Node{
119+
Node: node,
120+
}
70121
}
71-
n.Node = node
72-
return n
122+
cloned.Node = node
123+
return cloned
73124
}
74125

75126
func InitNode(node *model.Node) (n *Node, err error) {

internal/analytic/node_record.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ func getRetryState(nodeID uint64) *NodeRetryState {
6969

7070
// updateNodeStatus directly updates node status without condition checks
7171
func updateNodeStatus(nodeID uint64, status bool, reason string) {
72-
mutex.Lock()
73-
defer mutex.Unlock()
72+
nodeMapMu.Lock()
73+
defer nodeMapMu.Unlock()
7474

7575
now := time.Now()
7676
if NodeMap[nodeID] == nil {
@@ -131,8 +131,8 @@ func markConnectionSuccess(nodeID uint64) {
131131
}
132132

133133
func logCurrentNodeStatus(prefix string) {
134-
mutex.Lock()
135-
defer mutex.Unlock()
134+
nodeMapMu.Lock()
135+
defer nodeMapMu.Unlock()
136136
if NodeMap != nil {
137137
logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
138138
}
@@ -219,13 +219,13 @@ func cleanupDisabledNodes(enabledEnvIDs []uint64) {
219219
}
220220
retryMutex.Unlock()
221221

222-
mutex.Lock()
222+
nodeMapMu.Lock()
223223
for envID := range NodeMap {
224224
if !enabledMap[envID] {
225225
delete(NodeMap, envID)
226226
}
227227
}
228-
mutex.Unlock()
228+
nodeMapMu.Unlock()
229229
}
230230

231231
// getEnabledNodes retrieves enabled nodes from cache or database
@@ -287,11 +287,11 @@ func RetrieveNodesStatus(ctx context.Context) {
287287
logger.Info("RetrieveNodesStatus start")
288288
defer logger.Info("RetrieveNodesStatus exited")
289289

290-
mutex.Lock()
290+
nodeMapMu.Lock()
291291
if NodeMap == nil {
292292
NodeMap = make(TNodeMap)
293293
}
294-
mutex.Unlock()
294+
nodeMapMu.Unlock()
295295

296296
envCheckTicker := time.NewTicker(30 * time.Second)
297297
defer envCheckTicker.Stop()
@@ -396,8 +396,8 @@ func RetrieveNodesStatus(ctx context.Context) {
396396
}
397397

398398
func checkNodeTimeouts(timeout time.Duration) {
399-
mutex.Lock()
400-
defer mutex.Unlock()
399+
nodeMapMu.Lock()
400+
defer nodeMapMu.Unlock()
401401
now := time.Now()
402402
for _, node := range NodeMap {
403403
if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
@@ -445,7 +445,7 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
445445

446446
node, err := InitNode(nodeModel)
447447
if err != nil {
448-
mutex.Lock()
448+
nodeMapMu.Lock()
449449
if NodeMap[nodeModel.ID] == nil {
450450
NodeMap[nodeModel.ID] = &Node{
451451
Node: nodeModel,
@@ -455,13 +455,13 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
455455
NodeMap[nodeModel.ID].Status = false
456456
NodeMap[nodeModel.ID].ResponseAt = time.Now()
457457
}
458-
mutex.Unlock()
458+
nodeMapMu.Unlock()
459459
return err
460460
}
461461

462-
mutex.Lock()
462+
nodeMapMu.Lock()
463463
NodeMap[nodeModel.ID] = node
464-
mutex.Unlock()
464+
nodeMapMu.Unlock()
465465

466466
u, err := nodeModel.GetWebSocketURL("/api/analytic/intro")
467467
if err != nil {
@@ -515,7 +515,7 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
515515
return nil
516516
}
517517

518-
mutex.Lock()
518+
nodeMapMu.Lock()
519519
if NodeMap[nodeModel.ID] == nil {
520520
NodeMap[nodeModel.ID] = &Node{
521521
Node: nodeModel,
@@ -535,6 +535,6 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
535535
NodeMap[nodeModel.ID].Status = true
536536
NodeMap[nodeModel.ID].ResponseAt = time.Now()
537537
}
538-
mutex.Unlock()
538+
nodeMapMu.Unlock()
539539
}
540540
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package analytic
2+
3+
import (
4+
"testing"
5+
6+
"github.com/0xJacky/Nginx-UI/internal/upstream"
7+
"github.com/0xJacky/Nginx-UI/model"
8+
)
9+
10+
func TestSnapshotNodeMapIsolation(t *testing.T) {
11+
nodeMapMu.Lock()
12+
original := NodeMap
13+
NodeMap = make(TNodeMap)
14+
NodeMap[1] = &Node{
15+
Node: &model.Node{
16+
Model: model.Model{ID: 1},
17+
Name: "node-1",
18+
URL: "https://example.com",
19+
},
20+
NodeStat: NodeStat{
21+
Status: true,
22+
UpstreamStatusMap: map[string]*upstream.Status{
23+
"default": {
24+
Online: true,
25+
Latency: 5,
26+
},
27+
},
28+
},
29+
NodeInfo: NodeInfo{
30+
Version: "1.0.0",
31+
},
32+
}
33+
nodeMapMu.Unlock()
34+
35+
t.Cleanup(func() {
36+
nodeMapMu.Lock()
37+
NodeMap = original
38+
nodeMapMu.Unlock()
39+
})
40+
41+
snapshot := SnapshotNodeMap()
42+
43+
nodeMapMu.Lock()
44+
NodeMap[1].Status = false
45+
NodeMap[1].UpstreamStatusMap["default"].Online = false
46+
NodeMap[1].Node.Name = "mutated"
47+
nodeMapMu.Unlock()
48+
49+
cloned := snapshot[1]
50+
if cloned == nil {
51+
t.Fatalf("expected snapshot entry for node 1")
52+
}
53+
54+
if !cloned.Status {
55+
t.Fatalf("expected snapshot status to remain true, got false")
56+
}
57+
58+
upstreamStatus, ok := cloned.UpstreamStatusMap["default"]
59+
if !ok || upstreamStatus == nil {
60+
t.Fatalf("expected upstream status in snapshot")
61+
}
62+
if !upstreamStatus.Online {
63+
t.Fatalf("expected upstream online in snapshot")
64+
}
65+
66+
if cloned.Node == nil {
67+
t.Fatalf("expected cloned node metadata")
68+
}
69+
if cloned.Node.Name != "node-1" {
70+
t.Fatalf("expected cloned node name to remain 'node-1', got %s", cloned.Node.Name)
71+
}
72+
}
73+
74+
func TestGetNodeReturnsClonedData(t *testing.T) {
75+
originalDBNode := &model.Node{
76+
Model: model.Model{ID: 2},
77+
Name: "db-node",
78+
URL: "https://cluster.local",
79+
Token: "secret",
80+
}
81+
82+
nodeMapMu.Lock()
83+
original := NodeMap
84+
NodeMap = make(TNodeMap)
85+
NodeMap[2] = &Node{
86+
Node: &model.Node{
87+
Model: model.Model{ID: 2},
88+
Name: "cached-node",
89+
},
90+
NodeStat: NodeStat{
91+
Status: true,
92+
},
93+
}
94+
nodeMapMu.Unlock()
95+
96+
t.Cleanup(func() {
97+
nodeMapMu.Lock()
98+
NodeMap = original
99+
nodeMapMu.Unlock()
100+
})
101+
102+
result := GetNode(originalDBNode)
103+
if result == nil {
104+
t.Fatalf("expected GetNode result")
105+
}
106+
if result.Node == nil {
107+
t.Fatalf("expected result node metadata")
108+
}
109+
110+
if result.Node.Name != "db-node" {
111+
t.Fatalf("expected node name from DB copy, got %s", result.Node.Name)
112+
}
113+
114+
nodeMapMu.Lock()
115+
NodeMap[2].Node.Name = "mutated-cache"
116+
nodeMapMu.Unlock()
117+
118+
if result.Node.Name != "db-node" {
119+
t.Fatalf("expected result node name to remain 'db-node', got %s", result.Node.Name)
120+
}
121+
122+
result.Node.Name = "updated-result"
123+
124+
nodeMapMu.RLock()
125+
if NodeMap[2].Node.Name == "updated-result" {
126+
t.Fatalf("expected NodeMap to remain isolated from result mutation")
127+
}
128+
nodeMapMu.RUnlock()
129+
}

0 commit comments

Comments
 (0)