Skip to content

Commit 7705986

Browse files
committed
Add in support for a TryLock to attempt to get a lock within a timeout
1 parent 6d30b8b commit 7705986

File tree

3 files changed

+98
-3
lines changed

3 files changed

+98
-3
lines changed

constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ var (
124124
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
125125
ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
126126
ErrBadArguments = errors.New("invalid arguments")
127+
ErrTimeout = errors.New("timeout exceeded")
127128
// ErrInvalidCallback = errors.New("zk: invalid callback specified")
128129

129130
errCodeToError = map[ErrCode]error{
@@ -144,6 +145,7 @@ var (
144145
errSessionMoved: ErrSessionMoved,
145146
errZReconfigDisabled: ErrReconfigDisabled,
146147
errBadArguments: ErrBadArguments,
148+
errTimeout: ErrTimeout,
147149
}
148150
)
149151

@@ -166,6 +168,7 @@ const (
166168
errOperationTimeout = -7
167169
errBadArguments = -8
168170
errInvalidState = -9
171+
errTimeout = -10
169172
// API errors
170173
errAPIError ErrCode = -100
171174
errNoNode ErrCode = -101 // *

lock.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strconv"
77
"strings"
8+
"time"
89
)
910

1011
var (
@@ -49,10 +50,24 @@ func (l *Lock) Lock() error {
4950
return l.LockWithData([]byte{})
5051
}
5152

53+
// TryLock attempts to acquire the lock before a timeout. It works like LockWithData, but it doesn't
54+
// write any data to the lock node.
55+
func (l *Lock) TryLock(timeout time.Duration) error {
56+
return l.TryLockWithData([]byte{}, timeout)
57+
}
58+
5259
// LockWithData attempts to acquire the lock, writing data into the lock node.
5360
// It will wait to return until the lock is acquired or an error occurs. If
5461
// this instance already has the lock then ErrDeadlock is returned.
5562
func (l *Lock) LockWithData(data []byte) error {
63+
return l.TryLockWithData(data, time.Duration(-1))
64+
}
65+
66+
// TryLockWithData attempts to acquire the lock, writing data into the lock node.
67+
// It will wait to return until the lock is acquired, an error occurs, or the timeout. If
68+
// this instance already has the lock then ErrDeadlock is returned. A negative
69+
// timeout is waiting forever
70+
func (l *Lock) TryLockWithData(data []byte, timeout time.Duration) error {
5671
if l.lockPath != "" {
5772
return ErrDeadlock
5873
}
@@ -97,6 +112,8 @@ func (l *Lock) LockWithData(data []byte) error {
97112
return err
98113
}
99114

115+
start := time.Now()
116+
100117
for {
101118
children, _, err := l.c.Children(l.path)
102119
if err != nil {
@@ -134,9 +151,30 @@ func (l *Lock) LockWithData(data []byte) error {
134151
continue
135152
}
136153

137-
ev := <-ch
138-
if ev.Err != nil {
139-
return ev.Err
154+
if timeout >= 0 {
155+
wait := start.Add(timeout).Sub(time.Now())
156+
if wait < 0 {
157+
wait = time.Duration(1)
158+
}
159+
delay := time.NewTimer(wait)
160+
select {
161+
case ev := <-ch:
162+
if ev.Err != nil {
163+
return ev.Err
164+
}
165+
case <-delay.C:
166+
// remove the pending lock path on timeout
167+
delErr := l.c.Delete(path, -1)
168+
if delErr != nil {
169+
return delErr
170+
}
171+
return ErrTimeout
172+
}
173+
} else {
174+
ev := <-ch
175+
if ev.Err != nil {
176+
return ev.Err
177+
}
140178
}
141179
}
142180

lock_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package zk
22

33
import (
4+
"errors"
45
"testing"
56
"time"
67
)
@@ -61,6 +62,59 @@ func TestIntegration_Lock(t *testing.T) {
6162
}
6263
}
6364

65+
func TestIntegration_TryLock(t *testing.T) {
66+
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
defer ts.Stop()
71+
zk, _, err := ts.ConnectAll()
72+
if err != nil {
73+
t.Fatalf("Connect returned error: %+v", err)
74+
}
75+
defer zk.Close()
76+
77+
acls := WorldACL(PermAll)
78+
79+
l := NewLock(zk, "/test", acls)
80+
if err := l.TryLock(time.Second); err != nil {
81+
t.Fatal(err)
82+
}
83+
84+
l2 := NewLock(zk, "/test", acls)
85+
if err := l2.TryLock(time.Millisecond * 100); !errors.Is(err, ErrTimeout) {
86+
t.Fatalf("Expected ErrTimeout instead of %s", err)
87+
}
88+
89+
val := make(chan int, 3)
90+
91+
go func() {
92+
if err := l2.TryLock(time.Second); err != nil {
93+
t.Fatal(err)
94+
}
95+
val <- 2
96+
if err := l2.Unlock(); err != nil {
97+
t.Fatal(err)
98+
}
99+
val <- 3
100+
}()
101+
time.Sleep(time.Millisecond * 100)
102+
103+
val <- 1
104+
if err := l.Unlock(); err != nil {
105+
t.Fatal(err)
106+
}
107+
if x := <-val; x != 1 {
108+
t.Fatalf("Expected 1 instead of %d", x)
109+
}
110+
if x := <-val; x != 2 {
111+
t.Fatalf("Expected 2 instead of %d", x)
112+
}
113+
if x := <-val; x != 3 {
114+
t.Fatalf("Expected 3 instead of %d", x)
115+
}
116+
}
117+
64118
// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"),
65119
// when a part of that path already exists (i.e. "/test-multi-level" node already exists).
66120
func TestIntegration_MultiLevelLock(t *testing.T) {

0 commit comments

Comments
 (0)