Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 36632b4

Browse files
committed
Define ChainedChannel{Sender, Receiver} wrappers
1 parent 3abe938 commit 36632b4

File tree

1 file changed

+136
-62
lines changed
  • unified-scheduler-pool/src

1 file changed

+136
-62
lines changed

unified-scheduler-pool/src/lib.rs

Lines changed: 136 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
1111
use {
1212
assert_matches::assert_matches,
13-
crossbeam_channel::{select, unbounded, Receiver, Sender},
13+
crossbeam_channel::{select, unbounded, Receiver, SendError, Sender},
1414
log::*,
1515
solana_ledger::blockstore_processor::{
1616
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
@@ -247,36 +247,132 @@ type ExecutedTaskPayload = SubchanneledPayload<Box<ExecutedTask>, ()>;
247247
// minimum at the cost of a single heap allocation per switching for the sake of Box-ing the Self
248248
// type to avoid infinite mem::size_of() due to the recursive type structure. Needless to say, such
249249
// an allocation can be amortized to be negligible.
250-
enum ChainedChannel<P1, P2> {
251-
Payload(P1),
252-
PayloadAndChannel(Box<dyn WithChannelAndPayload<P1, P2>>),
253-
}
250+
mod chained_channel {
251+
use super::*;
254252

255-
trait WithChannelAndPayload<P1, P2>: Send + Sync {
256-
fn payload_and_channel(self: Box<Self>) -> PayloadAndChannelInner<P1, P2>;
257-
}
253+
pub(super) enum ChainedChannel<P, C> {
254+
Payload(P),
255+
ContextAndChannel(Box<dyn WithContextAndPayload<P, C>>),
256+
}
258257

259-
type PayloadAndChannelInner<P1, P2> = (P2, Receiver<ChainedChannel<P1, P2>>);
258+
pub(super) trait WithContextAndPayload<P, C>: Send + Sync {
259+
fn context_and_channel(self: Box<Self>) -> ContextAndChannelInner<P, C>;
260+
}
260261

261-
struct PayloadAndChannelWrapper<P1, P2>(PayloadAndChannelInner<P1, P2>);
262+
type ContextAndChannelInner<P, C> = (C, Receiver<ChainedChannel<P, C>>);
262263

263-
impl<P1, P2> WithChannelAndPayload<P1, P2> for PayloadAndChannelWrapper<P1, P2>
264-
where
265-
P1: Send + Sync,
266-
P2: Send + Sync,
267-
{
268-
fn payload_and_channel(self: Box<Self>) -> PayloadAndChannelInner<P1, P2> {
269-
self.0
264+
struct ContextAndChannelWrapper<P, C>(ContextAndChannelInner<P, C>);
265+
266+
impl<P, C> WithContextAndPayload<P, C> for ContextAndChannelWrapper<P, C>
267+
where
268+
P: Send + Sync,
269+
C: Send + Sync,
270+
{
271+
fn context_and_channel(self: Box<Self>) -> ContextAndChannelInner<P, C> {
272+
self.0
273+
}
270274
}
271-
}
272275

273-
impl<P1, P2> ChainedChannel<P1, P2>
274-
where
275-
P1: Send + Sync + 'static,
276-
P2: Send + Sync + 'static,
277-
{
278-
fn chain_to_new_channel(payload: P2, receiver: Receiver<Self>) -> Self {
279-
Self::PayloadAndChannel(Box::new(PayloadAndChannelWrapper((payload, receiver))))
276+
impl<P, C> ChainedChannel<P, C>
277+
where
278+
P: Send + Sync + 'static,
279+
C: Send + Sync + 'static,
280+
{
281+
fn chain_to_new_channel(context: C, receiver: Receiver<Self>) -> Self {
282+
Self::ContextAndChannel(Box::new(ContextAndChannelWrapper((context, receiver))))
283+
}
284+
}
285+
286+
pub(super) struct ChainedChannelSender<P, C> {
287+
sender: Sender<ChainedChannel<P, C>>,
288+
}
289+
290+
impl<P, C> ChainedChannelSender<P, C>
291+
where
292+
P: Send + Sync + 'static,
293+
C: Send + Sync + 'static + Clone,
294+
{
295+
fn new(sender: Sender<ChainedChannel<P, C>>) -> Self {
296+
Self { sender }
297+
}
298+
299+
pub(super) fn send_payload(
300+
&self,
301+
payload: P,
302+
) -> std::result::Result<(), SendError<ChainedChannel<P, C>>> {
303+
self.sender.send(ChainedChannel::Payload(payload))
304+
}
305+
306+
pub(super) fn send_chained_channel(
307+
&mut self,
308+
context: C,
309+
count: usize,
310+
) -> std::result::Result<(), SendError<ChainedChannel<P, C>>> {
311+
let (chained_sender, chained_receiver) = crossbeam_channel::unbounded();
312+
for _ in 0..count {
313+
self.sender.send(ChainedChannel::chain_to_new_channel(
314+
context.clone(),
315+
chained_receiver.clone(),
316+
))?
317+
}
318+
self.sender = chained_sender;
319+
Ok(())
320+
}
321+
}
322+
323+
pub(super) struct ChainedChannelReceiver<P, C: Clone> {
324+
receiver: Receiver<ChainedChannel<P, C>>,
325+
context: C,
326+
}
327+
328+
impl<P, C: Clone> Clone for ChainedChannelReceiver<P, C> {
329+
fn clone(&self) -> Self {
330+
Self {
331+
receiver: self.receiver.clone(),
332+
context: self.context.clone(),
333+
}
334+
}
335+
}
336+
337+
impl<P, C: Clone> ChainedChannelReceiver<P, C> {
338+
fn new(receiver: Receiver<ChainedChannel<P, C>>, initial_context: C) -> Self {
339+
Self {
340+
receiver,
341+
context: initial_context,
342+
}
343+
}
344+
345+
pub(super) fn context(&self) -> &C {
346+
&self.context
347+
}
348+
349+
pub(super) fn for_select(&self) -> &Receiver<ChainedChannel<P, C>> {
350+
&self.receiver
351+
}
352+
353+
pub(super) fn after_select(&mut self, message: ChainedChannel<P, C>) -> Option<P> {
354+
match message {
355+
ChainedChannel::Payload(payload) => Some(payload),
356+
ChainedChannel::ContextAndChannel(new_context_and_channel) => {
357+
(self.context, self.receiver) = new_context_and_channel.context_and_channel();
358+
None
359+
}
360+
}
361+
}
362+
}
363+
364+
pub(super) fn unbounded<P, C>(
365+
initial_context: C,
366+
) -> (ChainedChannelSender<P, C>, ChainedChannelReceiver<P, C>)
367+
where
368+
P: Send + Sync + 'static,
369+
C: Send + Sync + 'static + Clone,
370+
{
371+
let (sender, receiver) = crossbeam_channel::unbounded();
372+
(
373+
ChainedChannelSender::new(sender),
374+
ChainedChannelReceiver::new(receiver, initial_context),
375+
)
280376
}
281377
}
282378

@@ -369,23 +465,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
369465
);
370466
}
371467

372-
fn propagate_context_to_handler_threads(
373-
runnable_task_sender: &mut Sender<ChainedChannel<Task, SchedulingContext>>,
374-
context: SchedulingContext,
375-
handler_count: usize,
376-
) {
377-
let (next_sessioned_task_sender, runnable_task_receiver) = unbounded();
378-
for _ in 0..handler_count {
379-
runnable_task_sender
380-
.send(ChainedChannel::chain_to_new_channel(
381-
context.clone(),
382-
runnable_task_receiver.clone(),
383-
))
384-
.unwrap();
385-
}
386-
*runnable_task_sender = next_sessioned_task_sender;
387-
}
388-
389468
fn take_session_result_with_timings(&mut self) -> ResultWithTimings {
390469
self.session_result_with_timings.take().unwrap()
391470
}
@@ -399,8 +478,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
399478
}
400479

401480
fn start_threads(&mut self, context: &SchedulingContext) {
402-
let (runnable_task_sender, runnable_task_receiver) =
403-
unbounded::<ChainedChannel<Task, SchedulingContext>>();
481+
let (mut runnable_task_sender, runnable_task_receiver) =
482+
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
404483
let (executed_task_sender, executed_task_receiver) = unbounded::<ExecutedTaskPayload>();
405484
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
406485
let (accumulated_result_sender, accumulated_result_receiver) =
@@ -422,7 +501,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
422501
let handler_count = self.handler_count;
423502
let session_result_sender = self.session_result_sender.clone();
424503
let new_task_receiver = self.new_task_receiver.clone();
425-
let mut runnable_task_sender = runnable_task_sender.clone();
426504

427505
let mut session_ending = false;
428506
let mut active_task_count: usize = 0;
@@ -481,17 +559,16 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
481559
// be resolved in the case of single-threaded FIFO like this.
482560
active_task_count = active_task_count.checked_add(1).unwrap();
483561
runnable_task_sender
484-
.send(ChainedChannel::Payload(task))
562+
.send_payload(task)
485563
.unwrap();
486564
}
487565
NewTaskPayload::OpenSubchannel(context) => {
488566
// signal about new SchedulingContext to both handler and
489567
// accumulator threads
490-
Self::propagate_context_to_handler_threads(
491-
&mut runnable_task_sender,
568+
runnable_task_sender.send_chained_channel(
492569
context,
493570
handler_count
494-
);
571+
).unwrap();
495572
executed_task_sender
496573
.send(ExecutedTaskPayload::OpenSubchannel(()))
497574
.unwrap();
@@ -528,28 +605,25 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
528605

529606
let handler_main_loop = || {
530607
let pool = self.pool.clone();
531-
let mut bank = context.bank().clone();
532608
let mut runnable_task_receiver = runnable_task_receiver.clone();
533609
let finished_task_sender = finished_task_sender.clone();
534610

535611
move || loop {
536612
let (task, sender) = select! {
537-
recv(runnable_task_receiver) -> message => {
538-
match message.unwrap() {
539-
ChainedChannel::Payload(task) => {
540-
(task, &finished_task_sender)
541-
}
542-
ChainedChannel::PayloadAndChannel(new_channel) => {
543-
let new_context;
544-
(new_context, runnable_task_receiver) = new_channel.payload_and_channel();
545-
bank = new_context.bank().clone();
546-
continue;
547-
}
613+
recv(runnable_task_receiver.for_select()) -> message => {
614+
if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) {
615+
(task, &finished_task_sender)
616+
} else {
617+
continue;
548618
}
549619
},
550620
};
551621
let mut task = ExecutedTask::new_boxed(task);
552-
Self::execute_task_with_handler(&bank, &mut task, &pool.handler_context);
622+
Self::execute_task_with_handler(
623+
runnable_task_receiver.context().bank(),
624+
&mut task,
625+
&pool.handler_context,
626+
);
553627
sender.send(task).unwrap();
554628
}
555629
};

0 commit comments

Comments
 (0)