33const { Worker, isMainThread, parentPort } = require ( 'worker_threads' ) ;
44
55const threads = new Set ( ) ;
6+ const resources = new Map ( ) ;
67
78const LOCKED = 0 ;
89const UNLOCKED = 1 ;
@@ -17,62 +18,95 @@ const sendMessage = message => {
1718 }
1819} ;
1920
21+ class Lock {
22+ constructor ( name , options , callback ) {
23+ if ( typeof options === 'function' ) {
24+ callback = options ;
25+ options = { } ;
26+ }
27+ const { mode, ifAvailable, steal } = options ;
28+ this . name = name ;
29+ this . mode = mode || 'exclusive' ;
30+ this . ifAvailable = ifAvailable || false ;
31+ this . steal = steal || false ;
32+ this . callback = callback ;
33+ }
34+ }
35+
2036class Mutex {
21- constructor ( resourceName , shared , initial = false ) {
22- this . resourceName = resourceName ;
23- this . lock = new Int32Array ( shared , 0 , 1 ) ;
24- if ( initial ) Atomics . store ( this . lock , 0 , UNLOCKED ) ;
37+ constructor ( resourceName , buffer , initial = false ) {
38+ this . name = resourceName ;
39+ this . flag = new Int32Array ( buffer , 0 , 1 ) ;
40+ if ( initial ) Atomics . store ( this . flag , 0 , UNLOCKED ) ;
2541 this . owner = false ;
2642 this . trying = false ;
27- this . callback = null ;
43+ this . queue = [ ] ;
44+ this . current = null ;
2845 }
2946
30- enter ( callback ) {
31- this . callback = callback ;
47+ enter ( lock ) {
48+ this . queue . push ( lock ) ;
3249 this . trying = true ;
33- this . tryEnter ( ) ;
50+ return this . tryEnter ( ) ;
3451 }
3552
3653 tryEnter ( ) {
37- if ( ! this . callback ) return ;
38- const prev = Atomics . exchange ( this . lock , 0 , LOCKED ) ;
39- if ( prev === UNLOCKED ) {
40- this . owner = true ;
41- this . trying = false ;
42- this . callback ( this ) . then ( ( ) => {
43- this . leave ( ) ;
44- } ) ;
45- this . callback = null ;
46- }
54+ if ( this . queue . length === 0 ) return ;
55+ const prev = Atomics . exchange ( this . flag , 0 , LOCKED ) ;
56+ if ( prev === LOCKED ) return ;
57+ this . owner = true ;
58+ this . trying = false ;
59+ const lock = this . queue . shift ( ) ;
60+ this . current = lock ;
61+ return lock . callback ( lock ) . then ( ( ) => {
62+ this . leave ( ) ;
63+ } ) ;
64+ }
65+
66+ enterIfAvailable ( lock ) {
67+ if ( this . owner ) return lock . callback ( ) ;
68+ const prev = Atomics . exchange ( this . flag , 0 , LOCKED ) ;
69+ if ( prev === LOCKED ) return lock . callback ( ) ;
70+ this . owner = true ;
71+ this . trying = false ;
72+ this . current = lock ;
73+ return lock . callback ( lock ) . then ( ( ) => {
74+ this . leave ( ) ;
75+ } ) ;
4776 }
4877
4978 leave ( ) {
5079 if ( ! this . owner ) return ;
51- Atomics . store ( this . lock , 0 , UNLOCKED ) ;
80+ Atomics . store ( this . flag , 0 , UNLOCKED ) ;
5281 this . owner = false ;
53- sendMessage ( { kind : 'leave' , resourceName : this . resourceName } ) ;
82+ this . current = null ;
83+ sendMessage ( { kind : 'leave' , resourceName : this . name } ) ;
84+ this . tryEnter ( ) ;
5485 }
5586}
5687
57- const resources = new Map ( ) ;
58-
59- const request = ( resourceName , callback ) => {
60- let lock = resources . get ( resourceName ) ;
61- if ( ! lock ) {
88+ const request = ( resourceName , options , callback ) => {
89+ const lock = new Lock ( resourceName , options , callback ) ;
90+ let mutex = resources . get ( resourceName ) ;
91+ if ( ! mutex ) {
6292 const buffer = new SharedArrayBuffer ( 4 ) ;
63- lock = new Mutex ( resourceName , buffer , true ) ;
64- resources . set ( resourceName , lock ) ;
93+ mutex = new Mutex ( resourceName , buffer , true ) ;
94+ resources . set ( resourceName , mutex ) ;
6595 sendMessage ( { kind : 'create' , resourceName, buffer } ) ;
6696 }
67- lock . enter ( callback ) ;
68- return lock ;
97+ if ( lock . ifAvailable ) return mutex . enterIfAvailable ( lock ) ;
98+ return mutex . enter ( lock ) ;
6999} ;
70100
71101const receiveMessage = message => {
72102 const { kind, resourceName, buffer } = message ;
73103 if ( kind === 'create' ) {
74- const lock = new Mutex ( resourceName , buffer ) ;
75- resources . set ( resourceName , lock ) ;
104+ const mutex = new Mutex ( resourceName , buffer ) ;
105+ resources . set ( resourceName , mutex ) ;
106+ } else if ( kind === 'leave' ) {
107+ for ( const mutex of resources ) {
108+ if ( mutex . trying ) mutex . tryEnter ( ) ;
109+ }
76110 }
77111} ;
78112
@@ -96,6 +130,33 @@ class Thread {
96130 }
97131}
98132
99- const locks = { resources, request, sendMessage, receiveMessage, Thread } ;
133+ class LockManagerSnapshot {
134+ constructor ( ) {
135+ const held = [ ] ;
136+ const pending = [ ] ;
137+ this . held = held ;
138+ this . pending = pending ;
139+
140+ for ( const mutex of resources ) {
141+ if ( mutex . queue . length > 0 ) {
142+ pending . push ( ...mutex . queue ) ;
143+ }
144+ if ( mutex . current ) {
145+ held . push ( mutex . current ) ;
146+ }
147+ }
148+ }
149+ }
150+
151+ class LockManager {
152+ constructor ( ) {
153+ this . request = request ;
154+ this . Thread = Thread ;
155+ }
156+ query ( ) {
157+ const snapshot = new LockManagerSnapshot ( ) ;
158+ return Promise . resolve ( snapshot ) ;
159+ }
160+ }
100161
101- module . exports = { locks } ;
162+ module . exports = { locks : new LockManager ( ) } ;
0 commit comments