-
-
Notifications
You must be signed in to change notification settings - Fork 425
Rewrite of producer-consumer example #4740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
fb0e50f
55df323
b6e96bf
9b5bc0e
0cbc47f
11cad46
7c0105d
5301d02
02315a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,87 @@ | ||
| # producer-consumer | ||
|
|
||
| A program showing the producer–consumer problem (also called the bounded-buffer problem) solved with actors. | ||
| A program showing the producer–consumer problem | ||
| (also called the bounded-buffer problem) solved with actors. | ||
|
|
||
| Since producer-consumer is actually a family of problems[1], | ||
| let us define more precisely which one is addressed here. | ||
|
|
||
| Specifically, here we want a single producer | ||
| that writes to a bounded-length buffer, and | ||
| we want a single consumer that reads from that | ||
| buffer. | ||
|
|
||
| Our task is to coordinate the reading and | ||
| writing so that the sequence of | ||
| reads occurs in the same order that the writes | ||
| were made. | ||
|
|
||
| That is the problem. | ||
|
|
||
| * ordering invariant, the correctness property | ||
|
|
||
| In a correct solution, we want the | ||
| consumer's read sequence to | ||
| be invariant with respect to the presence or | ||
| absense of the buffer. The consumer should consume in the | ||
| same order as the producer sent output. This | ||
| is the ordering invariant. It will be | ||
| referred to again below. | ||
|
|
||
| The ordering invariant says that the buffer should be | ||
| invisible. It says that the delivery should be the same | ||
| whether or not the buffer is present. It says that | ||
| the buffer is just a performance optimization, and | ||
| not a logical necessity. While true, buffering | ||
| is critically important for real systems, and | ||
| is a natural model for a network link. | ||
|
|
||
| To provide more context and background: | ||
|
|
||
| The buffer is there to de-couple the producer | ||
| and consumer, allowing them to run independently | ||
| most of the time. | ||
|
|
||
| They can each go at their own speed. They can | ||
| avoid sharing a clock. The buffer can be | ||
| the only synchronization point between the two. | ||
|
|
||
| Having this decoupling is critical for many programs, and | ||
| the ever present reality in real networks of | ||
| independently acting computers. | ||
|
|
||
| The buffer prevents artificial caps on | ||
| the rate of production and consumption. | ||
|
|
||
| If the consumer is not ready | ||
| at this moment, the producer can still produce, | ||
| up to a point (the size of the buffer). | ||
|
|
||
| Similarly, the consumer can consume even if the | ||
| producer is now busy elsewhere, producing the | ||
| next product. | ||
|
|
||
| Once the ordering invariant above is met, the challenges are: | ||
|
|
||
| - to maximize throughput; | ||
|
|
||
| - while avoiding overwhelming either end. | ||
|
|
||
| This is a classic mutual exclusion problem that led | ||
| Dijkstra to invent the semaphore abstraction in 1965[2]. | ||
|
|
||
| He borrowed the name from a type of colorful rigid flag arm | ||
| used to control train crossings since the 1840s[3]. | ||
|
|
||
| Semaphores are not in Pony, because they use locks. | ||
|
|
||
| The challenge: how can we solve the problem without locks? | ||
|
|
||
| [1] https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem | ||
|
|
||
| [2] https://en.wikipedia.org/wiki/Semaphore_(programming) | ||
|
|
||
| [3] https://en.wikipedia.org/wiki/Railway_semaphore_signal | ||
|
|
||
| ## How to Compile | ||
|
|
||
|
|
@@ -23,19 +104,140 @@ Linking ./producer-consumer | |
|
|
||
| ## How to Run | ||
|
|
||
| Once `producer-consumer` has been compiled, in the same directory as this README file run `./producer-consumer`. You should a set of messages from: Main, Buffer, Producer, and Consumer actors. | ||
| Once `producer-consumer` has been compiled, in the same directory as this README file run `./producer-consumer`. You should a see of a log like: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. appears to be a typo here.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you keep the basic wording from before but update to include the new names of things. |
||
|
|
||
| ```console | ||
| $ ./producer-consumer | ||
| **Main** Finished. | ||
| **Buffer** Permission_to_produce | ||
| **Buffer** Permission_to_produce: Calling producer to produce | ||
| ... | ||
| **Buffer** Store_product | ||
| **Buffer** Store_product: Calling consumer to consume | ||
| **Consumer** Consuming product 2 | ||
| test: fifo/basic apply() called. | ||
| 1 test started, 0 complete: fifo/basic started | ||
| fifo: created with capacity: 2 and size: 2 | ||
| consumer: started. _next = 0 | ||
| producer: created. will produce 3 | ||
| fifo: consumerRequestsNext: nothing for consumer, add them to _waitQcons | ||
| producer: started. _next = 0 | ||
| fifo: requestToProduce(next=0) allows producer to produce id = 0 since _buf.size = 2 and _promised = 0 together are < _cap == 2 | ||
| producer: has produced 0 | ||
| fifo: dispatch() is taking consumer off _waitQcons to provide them next = 0 | ||
| fifo: requestToProduce(next=1) allows producer to produce id = 1 since _buf.size = 2 and _promised = 0 together are < _cap == 2 | ||
| consumer: has consumed 0 | ||
| producer: has produced 1 | ||
| consumer: about to ask for _next = 1 | ||
| fifo: requestToProduce(next=2) allows producer to produce id = 2 since _buf.size = 2 and _promised = 0 together are < _cap == 2 | ||
| fifo: consumerRequestsNext() has _buf.size = 2 | ||
| fifo: consumerRequestsNext about to provide = 1 ; now _buf.size = 2 | ||
| producer: has produced 2 | ||
| consumer: has consumed 1 | ||
| consumer: about to ask for _next = 2 | ||
| fifo: consumerRequestsNext() has _buf.size = 2 | ||
| fifo: consumerRequestsNext about to provide = 2 ; now _buf.size = 2 | ||
|
Comment on lines
+127
to
+132
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we discussed during sync and we had a 2 to 1 "i find the Thing" easier to read. Please change the output from "thing:" to "Thing". |
||
| consumer: has consumed 2 | ||
| elapsed nanosec = 186000 | ||
| 1 test started, 1 complete: fifo/basic complete | ||
| ---- Passed: fifo/basic | ||
| ---- | ||
| ---- 1 test ran. | ||
| ---- Passed: 1 | ||
| ``` | ||
|
|
||
| ## Program Modifications | ||
|
|
||
| Rather than modifying this program, pay attention to how the use of behaviors means this program is non-blocking and does not use locks/semaphores. | ||
| ### Basic | ||
|
|
||
| 1. Read the assertions. Do they assert that the | ||
| consumer never consumes the same product twice? | ||
| If not, add an assertion to this effect. | ||
|
|
||
| 2. Verify that the ordering invariant is maintained | ||
| in this code. Can we assert this in other places? | ||
|
|
||
| In other words, assert that the reader consumes products | ||
| in the same sequential order that the producer produced them. | ||
|
Comment on lines
+150
to
+154
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. markdown parsing on this will break the "this is a list". most parsers dont recognize multi-line list items. please turn this into a single line item. |
||
|
|
||
| (Formally, the consumption and production sequences should map one-to-one | ||
| to the natural numbers.) | ||
|
|
||
| 3. Try adding more than one consumer. How do the invariants change? | ||
|
|
||
| 4. Try adding more than one producer. How do the invariants change? | ||
|
Comment on lines
+146
to
+161
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for proper markdown editor handling, there should be no spaces between the the list items. |
||
|
|
||
| ### Medium | ||
|
|
||
| 5. Try sending and reading in batches, rather than one by one. | ||
|
|
||
| 6. Try having an unbounded buffer rather than a fixed size one. | ||
|
|
||
| 7. Cancellation | ||
|
|
||
| How do we avoid leaking memory or queue resources | ||
| if the producer or consumer wants to depart and | ||
| abandon its half-started consume or produce? | ||
|
|
||
| Implement a mechanism for clean departure, including | ||
| a means for both the producer and the consumer | ||
| to get acknowledgement that the buffer has | ||
| actually cancelled all outstanding requests. | ||
|
|
||
| What is the hazard in a real system if this | ||
| feature is missing? Hint: why does TCP | ||
| have the TIME_WAIT state? | ||
|
|
||
| # Advanced | ||
SeanTAllen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| 8. Optimistic batching | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. each section has its own list. so this should be starting at 1. |
||
|
|
||
| Rather than asking for permission, what if the | ||
| system was optimistic, and then asked for forgiveness? | ||
|
|
||
| That is, could the system be optimized by having the producer | ||
| send a batch of everything it has on hand and letting the buffer discard | ||
| what it cannot take, since its information about | ||
| how many slots are free might be stale by the time | ||
| the product batch arives? | ||
|
|
||
| This is based on the observation that there might be more | ||
| free slots now, since the consumer might done alot of | ||
| consuming since the original produce permission was granted. | ||
|
|
||
| This tends to happen especially in networks with large | ||
| bandwidth-delay products, like satellite links. | ||
| There, WAN network protocols like UDT[4], deploy this | ||
| optimization. But it can happen in any parallel setting. | ||
|
|
||
| How would the buffer communicate back how many it | ||
| could not store? This would require the producer | ||
| to have its own in-actor buffer. Would this extra memory | ||
| be worth it? What would you measure to tell? | ||
| Try it and find out. | ||
|
|
||
| 9. Cutting down on communication | ||
|
|
||
| Pony's iso references means that the buffer might actually | ||
| be owned by the consumer or producer, rather than | ||
| in a third actor (the buffer). | ||
|
|
||
| Would there be benefit to trying to "cut out the middle man", and | ||
| directly communicate between only two actors instead of three? | ||
|
|
||
| Implement this, and measure if it yields higher throughtput. | ||
|
|
||
|
|
||
SeanTAllen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ## Questions to think about | ||
|
|
||
| What is the optimal buffer size? | ||
|
|
||
| Does it depend on which is faster, the consumer or the producer? | ||
|
|
||
| If the buffer size is unbounded, under what circumstances | ||
| should we expect the kernel to OOM kill our program for | ||
| running out of memory? | ||
|
|
||
| If we provided an unbounded buffer, how else | ||
| can we prevent a fast producer from using all | ||
| available memory? | ||
|
|
||
| After thinking about it, you might look into | ||
| how Pony implements back-pressure | ||
| for its runtime system of actors with unbounded | ||
| message queues. | ||
|
|
||
| [4] https://en.wikipedia.org/wiki/UDP-based_Data_Transfer_Protocol | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| // assert.pony provides a simple assertion | ||
| // facility that does not require | ||
| // threading pony_check's TestHelper | ||
| // through production code. | ||
| // | ||
| // We endeavor to exit(1) the program immediately | ||
| // upon a failed assertion. | ||
| // | ||
| // Note that even when using the C FFI, your | ||
| // assert error message may still | ||
| // not be the last thing printed in the output. | ||
| // Other threads may get to print before the | ||
| // process is terminated. Scroll up in your | ||
| // output, searching for "error:" if you | ||
| // don't see your assertion message at the | ||
| // end of the output. | ||
|
|
||
| use @printf[I32](fmt: Pointer[U8] tag, ...) | ||
| use @fprintf[I32](stream:Pointer[U8], fmt: Pointer[U8] tag, ...) | ||
| use @exit[None](status:I32) | ||
| use @pony_os_stdout[Pointer[U8]]() | ||
| use @pony_os_stderr[Pointer[U8]]() | ||
| use @fflush[I32](stream:Pointer[U8]) | ||
| use @write[USize](fd:I32, buf:Pointer[U8] tag, sz:USize) | ||
|
|
||
| primitive Assert | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get why you created this Assert primitive and I've done something similar before in my applications. And I also know that this kind of facility is missing from the Pony standard library, because it it violates capability security. But I think it's really distracting to have this kind of code as part of an example, so I'd like to see this use more idiomatic Pony test helper uses if possible. If that's not feasible, let's talk about it. |
||
| fun crash(msg:String, loc:SourceLoc = __loc) => | ||
| // try to get our own line for better visibility. | ||
| @fflush[I32](@pony_os_stdout()) | ||
| @fflush[I32](@pony_os_stderr()) | ||
| let msg3 = "\n"+loc.file() + ":" + loc.line().string() + "\n" + loc.type_name() + "." + loc.method_name() + ": " + msg + "\n" | ||
| @write(I32(2), msg3.cstring(), msg3.size()) // 2 = stderr | ||
| @fflush[I32](@pony_os_stderr()) | ||
| @exit(1) | ||
|
|
||
| fun apply(mustHold:Bool, invariantText:String, loc: SourceLoc = __loc) => | ||
| """ | ||
| Assert.apply asserts an invariant by allowing the caller to supply | ||
| an expression that evaluates to a Bool, mustHold. | ||
| Assert.apply crashes and reports a violated invariant if mustHold is false. | ||
| Assert.apply() is morally equivalent to Assert.invar(). | ||
| """ | ||
| if mustHold then | ||
| return | ||
| end | ||
| crash("error: Assert.apply invariant '" + invariantText + "' violated!", loc) | ||
|
|
||
| fun invar(mustHold:Bool, invariantText:String, loc: SourceLoc = __loc) => | ||
| """ | ||
| Assert.invar crashes if mustHold is false. equivalent to Assert.apply(). | ||
| """ | ||
| if mustHold then | ||
| return | ||
| end | ||
| crash("error: Assert.invar '" + invariantText + "' violated!", loc) | ||
|
|
||
| fun equal[T: (Equatable[T] #read & Stringable #read)](got:T, want:T, inv:String = "", loc: SourceLoc = __loc) => | ||
| if got != want then | ||
| crash("error: Assert.equal violated! want: " + want.string() + ", but got: " + got.string() + "; "+inv, loc) | ||
| end | ||
|
|
||
| fun equalbox[T: (Equatable[T] box & Stringable box)](got:T, want:T, inv:String = "", loc: SourceLoc = __loc) => | ||
| if got != want then | ||
| crash("error: Assert.equalbox violated! want: " + want.string() + ", but got: " + got.string() + "; "+inv, loc) | ||
| end | ||
|
|
||
| fun lte[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) => | ||
| if a <= b then | ||
| return | ||
| end | ||
| crash("error: Assert.lte violated! want: " + a.string() + " <= " + b.string() + " since '" + inv + "'", loc) | ||
|
|
||
| fun gte[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) => | ||
| if a >= b then | ||
| return | ||
| end | ||
| crash("error: Assert.gte violated! want: " + a.string() + " <= " + b.string() + " since '" + inv + "'", loc) | ||
|
|
||
| fun lt[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) => | ||
| if a < b then | ||
| return | ||
| end | ||
| crash("error: Assert.lt violated! want: " + a.string() + " < " + b.string() + " since '" + inv + "'") | ||
|
|
||
| fun gt[T: (Comparable[T] #read & Stringable #read)](a:T, b:T, inv:String, loc: SourceLoc = __loc) => | ||
| if a > b then | ||
| return | ||
| end | ||
| crash("error: Assert.gt violated! want: " + a.string() + " > " + b.string() + " since '" + inv + "'", loc) | ||
|
|
||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.