Skip to content

Commit 0dba8d0

Browse files
author
Jason E. Aten, Ph.D.
committed
rewrite of producer-consumer example
addresses bugs and add assertions to verify correctness: - no internal fifo buffer overflows - the ordering invariant is maintained, such that the consumer consumes, in sequential order, the complete set of products produced.
1 parent 030335c commit 0dba8d0

File tree

9 files changed

+554
-115
lines changed

9 files changed

+554
-115
lines changed

examples/producer-consumer/README.md

Lines changed: 212 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,87 @@
11
# producer-consumer
22

3-
A program showing the producer–consumer problem (also called the bounded-buffer problem) solved with actors.
3+
A program showing the producer–consumer problem
4+
(also called the bounded-buffer problem) solved with actors.
5+
6+
Since producer-consumer is actually a family of problems[1],
7+
let us define more precisely which one is addressed here.
8+
9+
Specifically, here we want a single producer
10+
that writes to a bounded-length buffer, and
11+
we want a single consumer that reads from that
12+
buffer.
13+
14+
Our task is to coordinate the reading and
15+
writing so that the sequence of
16+
reads occurs in the same order that the writes
17+
were made.
18+
19+
That is the problem.
20+
21+
* ordering invariant, the correctness property
22+
23+
In a correct solution, we want the
24+
consumer's read sequence to
25+
be invariant with respect to the presence or
26+
absense of the buffer. The consumer should consume in the
27+
same order as the producer sent output. This
28+
is the ordering invariant. It will be
29+
referred to again below.
30+
31+
The ordering invariant says that the buffer should be
32+
invisible. It says that the delivery should be the same
33+
whether or not the buffer is present. It says that
34+
the buffer is just a performance optimization, and
35+
not a logical necessity. While true, buffering
36+
is critically important for real systems, and
37+
is a natural model for a network link.
38+
39+
To provide more context and background:
40+
41+
The buffer is there to de-couple the producer
42+
and consumer, allowing them to run independently
43+
most of the time.
44+
45+
They can each go at their own speed. They can
46+
avoid sharing a clock. The buffer can be
47+
the only synchronization point between the two.
48+
49+
Having this decoupling is critical for many programs, and
50+
the ever present reality in real networks of
51+
independently acting computers.
52+
53+
The buffer prevents artificial caps on
54+
the rate of production and consumption.
55+
56+
If the consumer is not ready
57+
at this moment, the producer can still produce,
58+
up to a point (the size of the buffer).
59+
60+
Similarly, the consumer can consume even if the
61+
producer is now busy elsewhere, producing the
62+
next product.
63+
64+
Once the ordering invariant above is met, the challenges are:
65+
66+
- to maximize throughput;
67+
68+
- while avoiding overwhelming either end.
69+
70+
This is a classic mutual exclusion problem that led
71+
Dijkstra to invent the semaphore abstraction in 1965[2].
72+
73+
He borrowed the name from a type of colorful rigid flag arm
74+
used to control train crossings since the 1840s[3].
75+
76+
Semaphores are not in Pony, because they use locks.
77+
78+
The challenge: how can we solve the problem without locks?
79+
80+
[1] https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
81+
82+
[2] https://en.wikipedia.org/wiki/Semaphore_(programming)
83+
84+
[3] https://en.wikipedia.org/wiki/Railway_semaphore_signal
485

586
## How to Compile
687

@@ -23,19 +104,140 @@ Linking ./producer-consumer
23104

24105
## How to Run
25106

26-
Once `producer-consumer` has been compiled, in the same directory as this README file run `./producer-consumer`. You should a set of messages from: Main, Buffer, Producer, and Consumer actors.
107+
Once `producer-consumer` has been compiled, in the same directory as this README file run `./producer-consumer`. You should a see of a log like:
27108

28109
```console
29110
$ ./producer-consumer
30-
**Main** Finished.
31-
**Buffer** Permission_to_produce
32-
**Buffer** Permission_to_produce: Calling producer to produce
33-
...
34-
**Buffer** Store_product
35-
**Buffer** Store_product: Calling consumer to consume
36-
**Consumer** Consuming product 2
111+
test: fifo/basic apply() called.
112+
1 test started, 0 complete: fifo/basic started
113+
fifo: created with capacity: 2 and size: 2
114+
consumer: started. _next = 0
115+
producer: created. will produce 3
116+
fifo: consumerRequestsNext: nothing for consumer, add them to _waitQcons
117+
producer: started. _next = 0
118+
fifo: requestToProduce(next=0) allows producer to produce id = 0 since _buf.size = 2 and _promised = 0 together are < _cap == 2
119+
producer: has produced 0
120+
fifo: dispatch() is taking consumer off _waitQcons to provide them next = 0
121+
fifo: requestToProduce(next=1) allows producer to produce id = 1 since _buf.size = 2 and _promised = 0 together are < _cap == 2
122+
consumer: has consumed 0
123+
producer: has produced 1
124+
consumer: about to ask for _next = 1
125+
fifo: requestToProduce(next=2) allows producer to produce id = 2 since _buf.size = 2 and _promised = 0 together are < _cap == 2
126+
fifo: consumerRequestsNext() has _buf.size = 2
127+
fifo: consumerRequestsNext about to provide = 1 ; now _buf.size = 2
128+
producer: has produced 2
129+
consumer: has consumed 1
130+
consumer: about to ask for _next = 2
131+
fifo: consumerRequestsNext() has _buf.size = 2
132+
fifo: consumerRequestsNext about to provide = 2 ; now _buf.size = 2
133+
consumer: has consumed 2
134+
elapsed nanosec = 186000
135+
1 test started, 1 complete: fifo/basic complete
136+
---- Passed: fifo/basic
137+
----
138+
---- 1 test ran.
139+
---- Passed: 1
37140
```
38141

39142
## Program Modifications
40143

41-
Rather than modifying this program, pay attention to how the use of behaviors means this program is non-blocking and does not use locks/semaphores.
144+
# basic
145+
146+
1. Read the assertions. Do they assert that the
147+
consumer never consumes the same product twice?
148+
If not, add an assertion to this effect.
149+
150+
2. Verify that the ordering invariant is maintained
151+
in this code. Can we assert this in other places?
152+
153+
In other words, assert that the reader consumes products
154+
in the same sequential order that the producer produced them.
155+
156+
(Formally, the consumption and production sequences should map one-to-one
157+
to the natural numbers.)
158+
159+
3. Try adding more than one consumer. How do the invariants change?
160+
161+
4. Try adding more than one producer. How do the invariants change?
162+
163+
# medium
164+
165+
5. Try sending and reading in batches, rather than one by one.
166+
167+
6. Try having an unbounded buffer rather than a fixed size one.
168+
169+
7. Cancellation
170+
171+
How do we avoid leaking memory or queue resources
172+
if the producer or consumer wants to depart and
173+
abandon its half-started consume or produce?
174+
175+
Implement a mechanism for clean departure, including
176+
a means for both the producer and the consumer
177+
to get acknowledgement that the buffer has
178+
actually cancelled all outstanding requests.
179+
180+
What is the hazard in a real system if this
181+
feature is missing? Hint: why does TCP
182+
have the TIME_WAIT state?
183+
184+
# advanced
185+
186+
8. Optimistic batching
187+
188+
Rather than asking for permission, what if the
189+
system was optimistic, and then asked for forgiveness?
190+
191+
That is, could the system be optimized by having the producer
192+
send a batch of everything it has on hand and letting the buffer discard
193+
what it cannot take, since its information about
194+
how many slots are free might be stale by the time
195+
the product batch arives?
196+
197+
This is based on the observation that there might be more
198+
free slots now, since the consumer might done alot of
199+
consuming since the original produce permission was granted.
200+
201+
This tends to happen especially in networks with large
202+
bandwidth-delay products, like satellite links.
203+
There, WAN network protocols like UDT[4], deploy this
204+
optimization. But it can happen in any parallel setting.
205+
206+
How would the buffer communicate back how many it
207+
could not store? This would require the producer
208+
to have its own in-actor buffer. Would this extra memory
209+
be worth it? What would you measure to tell?
210+
Try it and find out.
211+
212+
9. Cutting down on communication
213+
214+
Pony's iso references means that the buffer might actually
215+
be owned by the consumer or producer, rather than
216+
in a third actor (the buffer).
217+
218+
Would there be benefit to trying to "cut out the middle man", and
219+
directly communicate between only two actors instead of three?
220+
221+
Implement this, and measure if it yields higher throughtput.
222+
223+
224+
## Questions to think about
225+
226+
What is the optimal buffer size?
227+
228+
Does it depend on which is faster, the consumer or the producer?
229+
230+
If the buffer size is unbounded, under what circumstances
231+
should we expect the kernel to OOM kill our program for
232+
running out of memory?
233+
234+
If we provided an unbounded buffer, how else
235+
can we prevent a fast producer from using all
236+
available memory?
237+
238+
After thinking about it, you might look into
239+
how Pony implements back-pressure
240+
for its runtime system of actors with unbounded
241+
message queues.
242+
243+
[4] https://en.wikipedia.org/wiki/UDP-based_Data_Transfer_Protocol
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// assert.pony provides a simple assertion
2+
// facility that does not require
3+
// threading pony_check's TestHelper
4+
// through production code.
5+
//
6+
// We endeavor to exit(1) the program immediately
7+
// upon a failed assertion.
8+
//
9+
// Note that even when using the C FFI, your
10+
// assert error message may still
11+
// not be the last thing printed in the output.
12+
// Other threads may get to print before the
13+
// process is terminated. Scroll up in your
14+
// output, searching for "error:" if you
15+
// don't see your assertion message at the
16+
// end of the output.
17+
18+
use @printf[I32](fmt: Pointer[U8] tag, ...)
19+
use @fprintf[I32](stream:Pointer[U8], fmt: Pointer[U8] tag, ...)
20+
use @exit[None](status:I32)
21+
use @pony_os_stdout[Pointer[U8]]()
22+
use @pony_os_stderr[Pointer[U8]]()
23+
use @fflush[I32](stream:Pointer[U8])
24+
use @write[USize](fd:I32, buf:Pointer[U8] tag, sz:USize)
25+
26+
primitive Assert
27+
fun crash(msg:String, loc:SourceLoc = __loc) =>
28+
// try to get our own line for better visibility.
29+
@fflush[I32](@pony_os_stdout())
30+
@fflush[I32](@pony_os_stderr())
31+
let msg3 = "\n"+loc.file() + ":" + loc.line().string() + "\n" + loc.type_name() + "." + loc.method_name() + ": " + msg + "\n"
32+
@write(I32(2), msg3.cstring(), msg3.size()) // 2 = stderr
33+
@fflush[I32](@pony_os_stderr())
34+
@exit(1)
35+
36+
fun apply(mustHold:Bool, invariantText:String, loc: SourceLoc = __loc) =>
37+
if mustHold then
38+
return
39+
end
40+
crash("error: Assert.apply invariant '" + invariantText + "' violated!", loc)
41+
42+
fun invar(mustHold:Bool, invariantText:String, loc: SourceLoc = __loc) =>
43+
if mustHold then
44+
return
45+
end
46+
crash("error: Assert.invar '" + invariantText + "' violated!", loc)
47+
48+
fun equal[T: (Equatable[T] #read & Stringable #read)](got:T, want:T, inv:String = "", loc: SourceLoc = __loc) =>
49+
if got != want then
50+
crash("error: Assert.equal violated! want: " + want.string() + ", but got: " + got.string() + "; "+inv, loc)
51+
end
52+
53+
fun equalbox[T: (Equatable[T] box & Stringable box)](got:T, want:T, inv:String = "", loc: SourceLoc = __loc) =>
54+
if got != want then
55+
crash("error: Assert.equalbox violated! want: " + want.string() + ", but got: " + got.string() + "; "+inv, loc)
56+
end
57+
58+
fun lte[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) =>
59+
if a <= b then
60+
return
61+
end
62+
crash("error: Assert.lte violated! want: " + a.string() + " <= " + b.string() + " since '" + inv + "'", loc)
63+
64+
fun gte[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) =>
65+
if a >= b then
66+
return
67+
end
68+
crash("error: Assert.gte violated! want: " + a.string() + " <= " + b.string() + " since '" + inv + "'", loc)
69+
70+
fun lt[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) =>
71+
if a < b then
72+
return
73+
end
74+
crash("error: Assert.lt violated! want: " + a.string() + " < " + b.string() + " since '" + inv + "'")
75+
76+
fun gt[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) =>
77+
if a > b then
78+
return
79+
end
80+
crash("error: Assert.gt violated! want: " + a.string() + " > " + b.string() + " since '" + inv + "'", loc)
81+

examples/producer-consumer/buffer.pony

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)