-
Notifications
You must be signed in to change notification settings - Fork 2
/
MessageBatchDrainTest3.tla
163 lines (125 loc) · 6.69 KB
/
MessageBatchDrainTest3.tla
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
------------------------------- MODULE MessageBatchDrainTest3 -------------------------------
EXTENDS Naturals, TLC
CONSTANT N, \* number of processes (pick at least 3)
MAX_QUEUE_SIZE, \* max size of (bounded) queue (e.g. 10)
ADD \* number of elements to add (e.g. 6): make it such that N * ADD > MAX_QUEUE_SIZE to get dropped elements on addition
ASSUME N \in Nat \{0}
Procs == 1..N
Max(x,y) == IF x > y THEN x ELSE y
Min(x,y) == IF x < y THEN x ELSE y
(*
Algorithm for having an MPSC queue where producers (after adding to the queue) either become the single consumer, or terminate.
The queue is bounded and is represented by 'size', and 'counter' decides which producer gets to act as consumer
next (after adding an element to the queue).
Note that if size is 7, MAX_QUEUE_SIZE 10 and we want to add 6 messages, then we can only add 3, dropping 3 elements, making size 10.
The Java algorithm is in MessageBatchDrainTest2 (JGroups):
https://github.com/belaban/JGroups/blob/master/tests/junit-functional/org/jgroups/tests/MessageBatchDrainTest2.java
Contrary to DrainTest, producers can add _multiple_ elements and the consumer removes _all_ of the elements in the queue.
Note that 'counter' can temporarily become negative, e.g. for threads T1, T2, T3:
- T1 adds 10 -> size: 10, counter: 10
- T2 adds 5 -> size: 15
- T3 adds 5 -> size: 20
- T1 removes all elements: removed: 0, size: 0, counter: -10
- T2 increments counter by 5: counter is -5
- T3 increments counter by 5: counter is 0
Contrary to MessageBatchDrainTest2, this should be correct as counter is only incremented and decremented by 1:
this represents 1 addition (no matter how many elements were added), and not the number of messages added or removed.
Therefore, counter should never become negative.
*)
(*
--algorithm Add {
variables size=0, counter=0;
process (p \in Procs)
variables tmp=0, added=0, removed=0, new_size=0, old_size=0;
{
add:
old_size := size;
new_size := Min(size+ADD, MAX_QUEUE_SIZE);
added := new_size - size || size := new_size; \* Add ADD elements to the queue
\* print <<"old_size= ", old_size, "new_size= ",new_size, "size= ", size, "added= ", added>>;
if(added > 0) {
incr_counter:
tmp := counter || counter := counter+1;
tmp_check:
if(tmp = 0) { \* start draining the queue
drain:
removed := size || size := 0; \* Remove _all_ elements from the queue
\* Deliver removed messages to the application
decr_counter:
counter := counter-1;
assert ~counter < 0;
if(counter # 0)
goto drain;
}
};
}
}
*)
\* BEGIN TRANSLATION
VARIABLES size, counter, pc, tmp, added, removed, new_size, old_size
vars == << size, counter, pc, tmp, added, removed, new_size, old_size >>
ProcSet == (Procs)
Init == (* Global variables *)
/\ size = 0
/\ counter = 0
(* Process p *)
/\ tmp = [self \in Procs |-> 0]
/\ added = [self \in Procs |-> 0]
/\ removed = [self \in Procs |-> 0]
/\ new_size = [self \in Procs |-> 0]
/\ old_size = [self \in Procs |-> 0]
/\ pc = [self \in ProcSet |-> "add"]
add(self) == /\ pc[self] = "add"
/\ old_size' = [old_size EXCEPT ![self] = size]
/\ new_size' = [new_size EXCEPT ![self] = Min(size+ADD, MAX_QUEUE_SIZE)]
/\ /\ added' = [added EXCEPT ![self] = new_size'[self] - size]
/\ size' = new_size'[self]
/\ IF added'[self] > 0
THEN /\ pc' = [pc EXCEPT ![self] = "incr_counter"]
ELSE /\ pc' = [pc EXCEPT ![self] = "Done"]
/\ UNCHANGED << counter, tmp, removed >>
incr_counter(self) == /\ pc[self] = "incr_counter"
/\ /\ counter' = counter+1
/\ tmp' = [tmp EXCEPT ![self] = counter]
/\ pc' = [pc EXCEPT ![self] = "tmp_check"]
/\ UNCHANGED << size, added, removed, new_size, old_size >>
tmp_check(self) == /\ pc[self] = "tmp_check"
/\ IF tmp[self] = 0
THEN /\ pc' = [pc EXCEPT ![self] = "drain"]
ELSE /\ pc' = [pc EXCEPT ![self] = "Done"]
/\ UNCHANGED << size, counter, tmp, added, removed,
new_size, old_size >>
drain(self) == /\ pc[self] = "drain"
/\ /\ removed' = [removed EXCEPT ![self] = size]
/\ size' = 0
/\ pc' = [pc EXCEPT ![self] = "decr_counter"]
/\ UNCHANGED << counter, tmp, added, new_size, old_size >>
decr_counter(self) == /\ pc[self] = "decr_counter"
/\ counter' = counter-1
/\ Assert(~counter' < 0,
"Failure of assertion at line 68, column 28.")
/\ IF counter' # 0
THEN /\ pc' = [pc EXCEPT ![self] = "drain"]
ELSE /\ pc' = [pc EXCEPT ![self] = "Done"]
/\ UNCHANGED << size, tmp, added, removed, new_size,
old_size >>
p(self) == add(self) \/ incr_counter(self) \/ tmp_check(self)
\/ drain(self) \/ decr_counter(self)
Next == (\E self \in Procs: p(self))
\/ (* Disjunct to prevent deadlock on termination *)
((\A self \in ProcSet: pc[self] = "Done") /\ UNCHANGED vars)
Spec == Init /\ [][Next]_vars
Termination == <>(\A self \in ProcSet: pc[self] = "Done")
\* END TRANSLATION
\* Only one process can be in state decr_counter at any time (add to Model -> Model Overview -> Invariants)
OnlyOneDrain == \A i,k \in Procs: (i # k) => ~(/\ pc[i] = "drain" /\ pc[k] = "drain")
\* All the processes are done (state = "Done")
AllDone == \A self \in Procs: pc[self] = "Done"
\* We cannot have elements left in the queue but no threads to process them
\* Add Correctness to the model's properties (Model -> Model Overview -> Properties)
Correctness == [](AllDone => size = 0 /\ counter = 0)
=============================================================================
\* Modification History
\* Last modified Thu Jan 12 11:03:39 CET 2017 by bela
\* Last modified Fri Feb 13 10:00:32 EST 2015 by nrla
\* Created Wed Feb 11 18:05:23 EST 2015 by nrla