Skip to content

Commit f1521b5

Browse files
committed
sync lock
1 parent 3bb9399 commit f1521b5

File tree

6 files changed

+2627
-127
lines changed

6 files changed

+2627
-127
lines changed

client/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"clean": "rm -rf ./dist; mkdir -p ./dist",
1616
"prod": "cp -r ./dist/ ../server/dist/; cd ../server; npm run prod",
1717
"server": "cd ../server && npm run dev",
18+
"test": "vitest",
1819
"watch": "concurrently --kill-others 'npm run server' 'npm run check-types -- --watch --preserveWatchOutput' 'sleep 3; npm run dev'"
1920
},
2021
"dependencies": {
@@ -56,7 +57,8 @@
5657
"typescript": "^4.7.4",
5758
"use-debounce": "^9.0.4",
5859
"vite": "^3.0.7",
59-
"vite-plugin-svgr": "^4.1.0"
60+
"vite-plugin-svgr": "^4.1.0",
61+
"vitest": "^1.0.1"
6062
},
6163
"eslintConfig": {
6264
"extends": "@rocicorp/eslint-config"

client/src/index.tsx

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {mutators} from './model/mutators';
44
import {Replicache} from 'replicache';
55
import {UndoManager} from '@rocicorp/undo';
66
import App from './app';
7+
import {lock} from './util/sync-lock';
78

89
async function init() {
910
// See https://doc.replicache.dev/licensing for how to get a license key.
@@ -12,11 +13,34 @@ async function init() {
1213
throw new Error('Missing VITE_REPLICACHE_LICENSE_KEY');
1314
}
1415

16+
const syncLock = lock('pull');
1517
const r = new Replicache({
1618
name: 'anon',
1719
licenseKey,
1820
mutators,
1921
logLevel: 'debug',
22+
puller: async requestBody => {
23+
if (!syncLock.held) {
24+
return {
25+
httpRequestInfo: {
26+
errorMessage: 'Tab does not hold the pull lock',
27+
httpStatusCode: 400,
28+
},
29+
};
30+
}
31+
const res = await fetch('/api/replicache/pull', {
32+
method: 'POST',
33+
headers: {'content-type': 'application/json'},
34+
body: JSON.stringify(requestBody),
35+
});
36+
return {
37+
response: await res.json(),
38+
httpRequestInfo: {
39+
errorMessage: !res.ok ? res.statusText : '',
40+
httpStatusCode: res.status,
41+
},
42+
};
43+
},
2044
pushURL: `/api/replicache/push`,
2145
pullURL: `/api/replicache/pull`,
2246
});
@@ -33,6 +57,14 @@ async function init() {
3357
ReactDOM.createRoot(document.getElementById('root') as HTMLElement).render(
3458
<Root />,
3559
);
60+
61+
// Issue a pull whenever we become leader.
62+
syncLock.onStatusChange(async held => {
63+
if (held) {
64+
console.log('status change?');
65+
await r.pull();
66+
}
67+
});
3668
}
3769

3870
await init();
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
import {test, expect, vi} from 'vitest';
2+
import {lock} from '../sync-lock';
3+
4+
class BroadcastChannel {
5+
#listeners = new Set<(e: {data: number}) => void>();
6+
7+
constructor(public readonly name: string) {}
8+
9+
postMessage(message: number) {
10+
for (const cb of this.#listeners) {
11+
cb({data: message});
12+
}
13+
}
14+
15+
close() {
16+
/* noop */
17+
}
18+
19+
set onmessage(cb: ((e: {data: number}) => void) | null) {
20+
if (cb) {
21+
this.#listeners.add(cb);
22+
}
23+
}
24+
}
25+
26+
// Inject a config we can control for testing.
27+
function newOptions() {
28+
let num = 0;
29+
const channel = new BroadcastChannel('sync-lock-test');
30+
return {
31+
channelFactory: (_: string) => channel,
32+
intervalMs: 100,
33+
nextNumber: () => (num += 1),
34+
get visible() {
35+
return true;
36+
},
37+
};
38+
}
39+
40+
test('acquires when there is only a single actor', async () => {
41+
vi.useFakeTimers();
42+
const options = newOptions();
43+
const l = lock('test1', options);
44+
45+
// Lock is not held on construction
46+
// Must wait for full interval before acquiring.
47+
expect(l.held).toBe(false);
48+
49+
vi.advanceTimersByTime(options.intervalMs + 1);
50+
51+
expect(l.held).toBe(true);
52+
});
53+
54+
test('drops as soon as a greater number arrives', async () => {
55+
vi.useFakeTimers();
56+
const options = newOptions();
57+
const l = lock('test2', options);
58+
59+
vi.advanceTimersByTime(options.intervalMs + 1);
60+
61+
// Lock is held by the only actor
62+
expect(l.held).toBe(true);
63+
64+
// Broadcast a greater number to simulate a new actor arriving
65+
options.channelFactory('test2').postMessage(2);
66+
67+
// Lock should be released
68+
expect(l.held).toBe(false);
69+
});
70+
71+
test('drops expired peers and takes back the lock', async () => {
72+
vi.useFakeTimers();
73+
const options = newOptions();
74+
const l = lock('test3', options);
75+
76+
vi.advanceTimersByTime(options.intervalMs);
77+
78+
// Lock is held by the only actor
79+
expect(l.held).toBe(true);
80+
81+
// Broadcast a greater number to simulate a new actor arriving
82+
options.channelFactory('test3').postMessage(2);
83+
84+
// Lock should be released
85+
expect(l.held).toBe(false);
86+
87+
// Advance time to expire the peer
88+
vi.advanceTimersByTime(options.intervalMs);
89+
90+
// Lock should be reacquired by us
91+
expect(l.held).toBe(true);
92+
});
93+
94+
test('does not take the lock too soon', async () => {
95+
vi.useFakeTimers();
96+
const options = newOptions();
97+
const l = lock('test4', options);
98+
99+
vi.advanceTimersByTime(options.intervalMs - 1);
100+
101+
// Lock is not held yet
102+
expect(l.held).toBe(false);
103+
104+
// Advance time to acquire the lock
105+
vi.advanceTimersByTime(2);
106+
107+
// Lock is held
108+
expect(l.held).toBe(true);
109+
});
110+
111+
// peer is not dropped if it heartbeats in time
112+
test('does not drop peers that heartbeat in time', async () => {
113+
vi.useFakeTimers();
114+
const options = newOptions();
115+
const l = lock('test5', options);
116+
117+
vi.advanceTimersByTime(options.intervalMs);
118+
119+
// Lock is held by the only actor
120+
expect(l.held).toBe(true);
121+
122+
// Broadcast a greater number to simulate a new actor arriving
123+
options.channelFactory('test5').postMessage(2);
124+
125+
// Lock should be released by us
126+
expect(l.held).toBe(false);
127+
128+
// Advance time to almost expire the peer
129+
vi.advanceTimersByTime(options.intervalMs - 1);
130+
131+
// Peer posts an update
132+
options.channelFactory('test5').postMessage(2);
133+
134+
// Advance time to almost expire the peer
135+
vi.advanceTimersByTime(options.intervalMs - 1);
136+
137+
// Lock should still be held by peer / released by us
138+
expect(l.held).toBe(false);
139+
});
140+
141+
// Lock never changes hands if no heartbeats
142+
test('acquires when there is only a single actor', async () => {
143+
vi.useFakeTimers();
144+
const options = newOptions();
145+
const l = lock('test6', options);
146+
147+
// Lock is not held on construction
148+
// Must wait for full interval before acquiring.
149+
expect(l.held).toBe(false);
150+
151+
for (let i = 0; i < 10; i++) {
152+
vi.advanceTimersByTime(options.intervalMs);
153+
expect(l.held).toBe(true);
154+
}
155+
});
156+
157+
// We can observe the lock state
158+
test('can observe lock state', async () => {
159+
vi.useFakeTimers();
160+
const options = newOptions();
161+
const l = lock('test7', options);
162+
163+
const states: boolean[] = [l.held];
164+
l.onStatusChange(state => {
165+
states.push(state);
166+
});
167+
168+
// advance to acquire the lock
169+
vi.advanceTimersByTime(options.intervalMs);
170+
expect(l.held).toBe(true);
171+
// Peer posts an update
172+
options.channelFactory('test7').postMessage(2);
173+
vi.runAllTicks();
174+
175+
expect(states).toEqual([false, true, false]);
176+
});
177+
178+
test('stable lock', async () => {
179+
vi.useFakeTimers();
180+
const options = newOptions();
181+
const l1 = lock('test8', options);
182+
const l2 = lock('test8', options);
183+
const l3 = lock('test8', options);
184+
185+
// advance to acquire the lock
186+
vi.advanceTimersByTime(options.intervalMs);
187+
expect(l1.held).toBe(false);
188+
expect(l2.held).toBe(false);
189+
expect(l3.held).toBe(true);
190+
191+
// ensure lock doesn't change hands
192+
for (let i = 0; i < 10; i++) {
193+
vi.advanceTimersByTime(options.intervalMs);
194+
expect(l1.held).toBe(false);
195+
expect(l2.held).toBe(false);
196+
expect(l3.held).toBe(true);
197+
}
198+
199+
// remove a peer
200+
l3.destroy();
201+
202+
// check that next highest peer takes over
203+
vi.advanceTimersByTime(options.intervalMs);
204+
expect(l1.held).toBe(false);
205+
expect(l2.held).toBe(true);
206+
207+
// repeat
208+
l2.destroy();
209+
vi.advanceTimersByTime(options.intervalMs);
210+
expect(l1.held).toBe(true);
211+
});

0 commit comments

Comments
 (0)