-
Notifications
You must be signed in to change notification settings - Fork 2
/
MessageBatchDrainTest2.tla
161 lines (122 loc) · 6.61 KB
/
MessageBatchDrainTest2.tla
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
------------------------------- MODULE MessageBatchDrainTest2 -------------------------------
EXTENDS Naturals, TLC
CONSTANT N, \* number of processes (e.g. 2)
MAX_QUEUE_SIZE, \* max size of (bounded) queue (e.g. 10)
ADD \* number of elements to add (e.g. 6): make it such that N * ADD > MAX_QUEUE_SIZE to get dropped elements on addition
ASSUME N \in Nat \{0}
Procs == 1..N
Max(x,y) == IF x > y THEN x ELSE y
Min(x,y) == IF x < y THEN x ELSE y
(*
Algorithm for having an MPSC queue where producers (after adding to the queue) either become the single consumer, or terminate.
The queue is bounded and is represented by 'size', and 'counter' decides which producer gets to act as consumer
next (after adding an element to the queue).
Note that if size is 7, MAX_QUEUE_SIZE 10 and we want to add 6 messages, then we can only add 3, dropping 3 elements, making size 10.
The Java algorithm is in MessageBatchDrainTest2 (JGroups):
https://github.com/belaban/JGroups/blob/master/tests/junit-functional/org/jgroups/tests/MessageBatchDrainTest2.java
Contrary to DrainTest, producers can add _multiple_ elements and the consumer removes _all_ of the elements in the queue.
Note that 'counter' can temporarily become negative, e.g. for threads T1, T2, T3:
- T1 adds 10 -> size: 10, counter: 10
- T2 adds 5 -> size: 15
- T3 adds 5 -> size: 20
- T1 removes all elements: removed: 0, size: 0, counter: -10
- T2 increments counter by 5: counter is -5
- T3 increments counter by 5: counter is 0
This algorithm is INCORRECT as counter going from (say) -4 to 0 will allow the next process to enter the 'drain' label (tmp == 0) while the existing
process is still in 'drain': this violates the OnlyOneDrain invariant (at the bottom) !!!
MessageBatchDrainTest3.tla contains the correct version
*)
(*
--algorithm Add {
variables size=0, counter=0;
process (p \in Procs)
variables tmp=0, added=0, removed=0, new_size=0, old_size=0;
{
add:
old_size := size;
new_size := Min(size+ADD, MAX_QUEUE_SIZE);
added := new_size - size || size := new_size; \* Add ADD elements to the queue
print <<"old_size= ", old_size, "new_size= ",new_size, "size= ", size, "added= ", added>>;
if(added > 0) {
incr_counter:
tmp := counter || counter := counter+added;
tmp_check:
if(tmp = 0) { \* start draining the queue
drain:
removed := size || size := 0; \* Remove _all_ elements from the queue
decr_counter:
counter := counter-removed;
if(counter # 0)
goto drain;
}
};
}
}
*)
\* BEGIN TRANSLATION
VARIABLES size, counter, pc, tmp, added, removed, new_size, old_size
vars == << size, counter, pc, tmp, added, removed, new_size, old_size >>
ProcSet == (Procs)
Init == (* Global variables *)
/\ size = 0
/\ counter = 0
(* Process p *)
/\ tmp = [self \in Procs |-> 0]
/\ added = [self \in Procs |-> 0]
/\ removed = [self \in Procs |-> 0]
/\ new_size = [self \in Procs |-> 0]
/\ old_size = [self \in Procs |-> 0]
/\ pc = [self \in ProcSet |-> "add"]
add(self) == /\ pc[self] = "add"
/\ old_size' = [old_size EXCEPT ![self] = size]
/\ new_size' = [new_size EXCEPT ![self] = Min(size+ADD, MAX_QUEUE_SIZE)]
/\ /\ added' = [added EXCEPT ![self] = new_size'[self] - size]
/\ size' = new_size'[self]
/\ PrintT(<<"old_size= ", old_size'[self], "new_size= ",new_size'[self], "size= ", size', "added= ", added'[self]>>)
/\ IF added'[self] > 0
THEN /\ pc' = [pc EXCEPT ![self] = "incr_counter"]
ELSE /\ pc' = [pc EXCEPT ![self] = "Done"]
/\ UNCHANGED << counter, tmp, removed >>
incr_counter(self) == /\ pc[self] = "incr_counter"
/\ /\ counter' = counter+added[self]
/\ tmp' = [tmp EXCEPT ![self] = counter]
/\ pc' = [pc EXCEPT ![self] = "tmp_check"]
/\ UNCHANGED << size, added, removed, new_size, old_size >>
tmp_check(self) == /\ pc[self] = "tmp_check"
/\ IF tmp[self] = 0
THEN /\ pc' = [pc EXCEPT ![self] = "drain"]
ELSE /\ pc' = [pc EXCEPT ![self] = "Done"]
/\ UNCHANGED << size, counter, tmp, added, removed,
new_size, old_size >>
drain(self) == /\ pc[self] = "drain"
/\ /\ removed' = [removed EXCEPT ![self] = size]
/\ size' = 0
/\ pc' = [pc EXCEPT ![self] = "decr_counter"]
/\ UNCHANGED << counter, tmp, added, new_size, old_size >>
decr_counter(self) == /\ pc[self] = "decr_counter"
/\ counter' = counter-removed[self]
/\ IF counter' # 0
THEN /\ pc' = [pc EXCEPT ![self] = "drain"]
ELSE /\ pc' = [pc EXCEPT ![self] = "Done"]
/\ UNCHANGED << size, tmp, added, removed, new_size,
old_size >>
p(self) == add(self) \/ incr_counter(self) \/ tmp_check(self)
\/ drain(self) \/ decr_counter(self)
Next == (\E self \in Procs: p(self))
\/ (* Disjunct to prevent deadlock on termination *)
((\A self \in ProcSet: pc[self] = "Done") /\ UNCHANGED vars)
Spec == Init /\ [][Next]_vars
Termination == <>(\A self \in ProcSet: pc[self] = "Done")
\* END TRANSLATION
\* Only one process can be in state decr_counter at any time (add to Model -> Model Overview -> Invariants)
OnlyOneDrain == \A i,k \in Procs: (i # k) => ~(/\ pc[i] = "drain" /\ pc[k] = "drain")
\* All the processes are done (state = "Done")
AllDone == \A self \in Procs: pc[self] = "Done"
\* We cannot have elements left in the queue but no threads to process them
\* Add Correctness to the model's properties (Model -> Model Overview -> Properties)
Correctness == [](AllDone => size = 0 /\ counter = 0)
=============================================================================
\* Modification History
\* Last modified Wed Jan 11 15:05:54 CET 2017 by bela
\* Last modified Fri Feb 13 10:00:32 EST 2015 by nrla
\* Created Wed Feb 11 18:05:23 EST 2015 by nrla