@@ -71,6 +71,9 @@ pub(crate) struct SchedulerController<T: LikeClusterInfo> {
71
71
worker_metrics : Vec < Arc < ConsumeWorkerMetrics > > ,
72
72
/// State for forwarding packets to the leader, if enabled.
73
73
forwarder : Option < Forwarder < T > > ,
74
+ batch : Vec < ImmutableDeserializedPacket > ,
75
+ batch_start : Instant ,
76
+ batch_interval : Duration ,
74
77
}
75
78
76
79
impl < T : LikeClusterInfo > SchedulerController < T > {
@@ -81,6 +84,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
81
84
scheduler : GreedyScheduler ,
82
85
worker_metrics : Vec < Arc < ConsumeWorkerMetrics > > ,
83
86
forwarder : Option < Forwarder < T > > ,
87
+ batch_interval : Duration ,
84
88
) -> Self {
85
89
Self {
86
90
decision_maker,
@@ -94,6 +98,10 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
94
98
timing_metrics : SchedulerTimingMetrics :: default ( ) ,
95
99
worker_metrics,
96
100
forwarder,
101
+
102
+ batch : Vec :: default ( ) ,
103
+ batch_start : Instant :: now ( ) ,
104
+ batch_interval,
97
105
}
98
106
}
99
107
@@ -123,6 +131,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
123
131
. maybe_report_and_reset_slot ( new_leader_slot) ;
124
132
125
133
self . receive_completed ( ) ?;
134
+ self . maybe_queue_batch ( ) ;
126
135
self . process_transactions ( & decision) ?;
127
136
if !self . receive_and_buffer_packets ( & decision) {
128
137
break ;
@@ -146,6 +155,23 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
146
155
Ok ( ( ) )
147
156
}
148
157
158
+ fn maybe_queue_batch ( & mut self ) {
159
+ // Early return if the current batch is empty.
160
+ if self . batch . is_empty ( ) {
161
+ return ;
162
+ }
163
+
164
+ if self . batch_start . elapsed ( ) > self . batch_interval {
165
+ Self :: buffer_packets (
166
+ & self . bank_forks ,
167
+ & mut self . container ,
168
+ & mut self . transaction_id_generator ,
169
+ & mut self . count_metrics ,
170
+ self . batch . drain ( ..) ,
171
+ ) ;
172
+ }
173
+ }
174
+
149
175
/// Process packets based on decision.
150
176
fn process_transactions (
151
177
& mut self ,
@@ -474,9 +500,11 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
474
500
} ) ;
475
501
476
502
if should_buffer {
477
- let ( _, buffer_time_us) = measure_us ! (
478
- self . buffer_packets( receive_packet_results. deserialized_packets)
479
- ) ;
503
+ // Packets are not immediately schedulable but are instead
504
+ // grouped into 50ms batches (measured from the recv time of
505
+ // the first packet).
506
+ let ( _, buffer_time_us) =
507
+ measure_us ! ( self . extend_batch( receive_packet_results. deserialized_packets) ) ;
480
508
self . timing_metrics . update ( |timing_metrics| {
481
509
saturating_add_assign ! ( timing_metrics. buffer_time_us, buffer_time_us) ;
482
510
} ) ;
@@ -496,12 +524,27 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
496
524
true
497
525
}
498
526
499
- fn buffer_packets ( & mut self , packets : Vec < ImmutableDeserializedPacket > ) {
527
+ fn extend_batch ( & mut self , packets : Vec < ImmutableDeserializedPacket > ) {
528
+ // If this is the first packet in the batch, set the
529
+ // start timestamp for the batch.
530
+ if self . batch . is_empty ( ) {
531
+ self . batch_start = Instant :: now ( ) ;
532
+ }
533
+ self . batch . extend ( packets) ;
534
+ }
535
+
536
+ fn buffer_packets (
537
+ bank_forks : & Arc < RwLock < BankForks > > ,
538
+ container : & mut TransactionStateContainer ,
539
+ transaction_id_generator : & mut TransactionIdGenerator ,
540
+ count_metrics : & mut SchedulerCountMetrics ,
541
+ packets : impl Iterator < Item = ImmutableDeserializedPacket > ,
542
+ ) {
500
543
// Convert to Arcs
501
544
let packets: Vec < _ > = packets. into_iter ( ) . map ( Arc :: new) . collect ( ) ;
502
545
// Sanitize packets, generate IDs, and insert into the container.
503
546
let ( root_bank, working_bank) = {
504
- let bank_forks = self . bank_forks . read ( ) . unwrap ( ) ;
547
+ let bank_forks = bank_forks. read ( ) . unwrap ( ) ;
505
548
let root_bank = bank_forks. root_bank ( ) ;
506
549
let working_bank = bank_forks. working_bank ( ) ;
507
550
( root_bank, working_bank)
@@ -586,7 +629,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
586
629
. filter ( |( _, check_result) | check_result. is_ok ( ) )
587
630
{
588
631
saturating_add_assign ! ( post_transaction_check_count, 1 ) ;
589
- let transaction_id = self . transaction_id_generator . next ( ) ;
632
+ let transaction_id = transaction_id_generator. next ( ) ;
590
633
591
634
let ( priority, cost) = Self :: calculate_priority_and_cost (
592
635
& transaction,
@@ -598,7 +641,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
598
641
max_age,
599
642
} ;
600
643
601
- if self . container . insert_new_transaction (
644
+ if container. insert_new_transaction (
602
645
transaction_id,
603
646
transaction_ttl,
604
647
packet,
@@ -617,7 +660,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
617
660
let num_dropped_on_transaction_checks =
618
661
post_lock_validation_count. saturating_sub ( post_transaction_check_count) ;
619
662
620
- self . count_metrics . update ( |count_metrics| {
663
+ count_metrics. update ( |count_metrics| {
621
664
saturating_add_assign ! (
622
665
count_metrics. num_dropped_on_capacity,
623
666
num_dropped_on_capacity
@@ -809,6 +852,7 @@ mod tests {
809
852
GreedyScheduler :: new ( consume_work_senders, finished_consume_work_receiver) ,
810
853
vec ! [ ] , // no actual workers with metrics to report, this can be empty
811
854
None ,
855
+ Duration :: from_millis ( 50 ) ,
812
856
) ;
813
857
814
858
( test_frame, scheduler_controller)
0 commit comments