@@ -267,6 +267,17 @@ pub struct Downloads<'a, 'cfg: 'a> {
267
267
largest : ( u64 , String ) ,
268
268
start : Instant ,
269
269
success : bool ,
270
+
271
+ /// Timeout management, both of timeout thresholds as well as whether or not
272
+ /// our connection has timed out (and accompanying message if it has).
273
+ ///
274
+ /// Note that timeout management is done manually here instead of in libcurl
275
+ /// because we want to apply timeouts to an entire batch of operations, not
276
+ /// any one particular single operatino
277
+ timeout : ops:: HttpTimeout , // timeout configuration
278
+ updated_at : Cell < Instant > , // last time we received bytes
279
+ next_speed_check : Cell < Instant > , // if threshold isn't 0 by this time, error
280
+ next_speed_check_bytes_threshold : Cell < u64 > , // decremented when we receive bytes
270
281
}
271
282
272
283
struct Download < ' cfg > {
@@ -293,24 +304,7 @@ struct Download<'cfg> {
293
304
294
305
/// The moment we started this transfer at
295
306
start : Instant ,
296
-
297
- /// Last time we noticed that we got some more data from libcurl
298
- updated_at : Cell < Instant > ,
299
-
300
- /// Timeout management, both of timeout thresholds as well as whether or not
301
- /// our connection has timed out (and accompanying message if it has).
302
- ///
303
- /// Note that timeout management is done manually here because we have a
304
- /// `Multi` with a lot of active transfers but between transfers finishing
305
- /// we perform some possibly slow synchronous work (like grabbing file
306
- /// locks, extracting tarballs, etc). The default timers on our `Multi` keep
307
- /// running during this work, but we don't want them to count towards timing
308
- /// everythig out. As a result, we manage this manually and take the time
309
- /// for synchronous work into account manually.
310
- timeout : ops:: HttpTimeout ,
311
307
timed_out : Cell < Option < String > > ,
312
- next_speed_check : Cell < Instant > ,
313
- next_speed_check_bytes_threshold : Cell < u64 > ,
314
308
315
309
/// Logic used to track retrying this download if it's a spurious failure.
316
310
retry : Retry < ' cfg > ,
@@ -359,6 +353,7 @@ impl<'cfg> PackageSet<'cfg> {
359
353
360
354
pub fn enable_download < ' a > ( & ' a self ) -> CargoResult < Downloads < ' a , ' cfg > > {
361
355
assert ! ( !self . downloading. replace( true ) ) ;
356
+ let timeout = ops:: HttpTimeout :: new ( self . config ) ?;
362
357
Ok ( Downloads {
363
358
start : Instant :: now ( ) ,
364
359
set : self ,
@@ -375,6 +370,10 @@ impl<'cfg> PackageSet<'cfg> {
375
370
downloaded_bytes : 0 ,
376
371
largest : ( 0 , String :: new ( ) ) ,
377
372
success : false ,
373
+ updated_at : Cell :: new ( Instant :: now ( ) ) ,
374
+ timeout,
375
+ next_speed_check : Cell :: new ( Instant :: now ( ) ) ,
376
+ next_speed_check_bytes_threshold : Cell :: new ( 0 ) ,
378
377
} )
379
378
}
380
379
@@ -446,7 +445,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
446
445
debug ! ( "downloading {} as {}" , id, token) ;
447
446
assert ! ( self . pending_ids. insert( id. clone( ) ) ) ;
448
447
449
- let ( mut handle, timeout ) = ops:: http_handle_and_timeout ( self . set . config ) ?;
448
+ let ( mut handle, _timeout ) = ops:: http_handle_and_timeout ( self . set . config ) ?;
450
449
handle. get ( true ) ?;
451
450
handle. url ( & url) ?;
452
451
handle. follow_location ( true ) ?; // follow redirects
@@ -501,7 +500,6 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
501
500
self . set . config . shell ( ) . status ( "Downloading" , "crates ..." ) ?;
502
501
}
503
502
504
- let now = Instant :: now ( ) ;
505
503
let dl = Download {
506
504
token,
507
505
data : RefCell :: new ( Vec :: new ( ) ) ,
@@ -511,11 +509,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
511
509
total : Cell :: new ( 0 ) ,
512
510
current : Cell :: new ( 0 ) ,
513
511
start : Instant :: now ( ) ,
514
- updated_at : Cell :: new ( now) ,
515
- timeout,
516
512
timed_out : Cell :: new ( None ) ,
517
- next_speed_check : Cell :: new ( now) ,
518
- next_speed_check_bytes_threshold : Cell :: new ( 0 ) ,
519
513
retry : Retry :: new ( self . set . config ) ?,
520
514
} ;
521
515
self . enqueue ( dl, handle) ?;
@@ -638,10 +632,8 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
638
632
// active downloads to make sure they don't fire because of a slowly
639
633
// extracted tarball.
640
634
let finish_dur = start. elapsed ( ) ;
641
- for ( dl, _) in self . pending . values_mut ( ) {
642
- dl. updated_at . set ( dl. updated_at . get ( ) + finish_dur) ;
643
- dl. next_speed_check . set ( dl. next_speed_check . get ( ) + finish_dur) ;
644
- }
635
+ self . updated_at . set ( self . updated_at . get ( ) + finish_dur) ;
636
+ self . next_speed_check . set ( self . next_speed_check . get ( ) + finish_dur) ;
645
637
646
638
let slot = & self . set . packages [ & dl. id ] ;
647
639
assert ! ( slot. fill( pkg) . is_ok( ) ) ;
@@ -652,12 +644,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
652
644
let mut handle = self . set . multi . add ( handle) ?;
653
645
let now = Instant :: now ( ) ;
654
646
handle. set_token ( dl. token ) ?;
647
+ self . updated_at . set ( now) ;
648
+ self . next_speed_check . set ( now + self . timeout . dur ) ;
649
+ self . next_speed_check_bytes_threshold . set ( self . timeout . low_speed_limit as u64 ) ;
655
650
dl. timed_out . set ( None ) ;
656
- dl. updated_at . set ( now) ;
657
651
dl. current . set ( 0 ) ;
658
652
dl. total . set ( 0 ) ;
659
- dl. next_speed_check . set ( now + dl. timeout . dur ) ;
660
- dl. next_speed_check_bytes_threshold . set ( dl. timeout . low_speed_limit as u64 ) ;
661
653
self . pending . insert ( dl. token , ( dl, handle) ) ;
662
654
Ok ( ( ) )
663
655
}
@@ -712,25 +704,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
712
704
dl. total . set ( total) ;
713
705
let now = Instant :: now ( ) ;
714
706
if cur != dl. current . get ( ) {
707
+ let delta = cur - dl. current . get ( ) ;
708
+ let threshold = self . next_speed_check_bytes_threshold . get ( ) ;
709
+
715
710
dl. current . set ( cur) ;
716
- dl . updated_at . set ( now) ;
711
+ self . updated_at . set ( now) ;
717
712
718
- if dl . current . get ( ) >= dl . next_speed_check_bytes_threshold . get ( ) {
719
- dl . next_speed_check . set ( now + dl . timeout . dur ) ;
720
- dl . next_speed_check_bytes_threshold . set (
721
- dl . current . get ( ) + dl . timeout . low_speed_limit as u64 ,
713
+ if delta >= threshold {
714
+ self . next_speed_check . set ( now + self . timeout . dur ) ;
715
+ self . next_speed_check_bytes_threshold . set (
716
+ self . timeout . low_speed_limit as u64 ,
722
717
) ;
718
+ } else {
719
+ self . next_speed_check_bytes_threshold . set ( threshold - delta) ;
723
720
}
724
721
}
725
722
if !self . tick ( WhyTick :: DownloadUpdate ) . is_ok ( ) {
726
723
return false
727
724
}
728
725
729
726
// If we've spent too long not actually receiving any data we time out.
730
- if now - dl. updated_at . get ( ) > dl. timeout . dur {
727
+ if now - self . updated_at . get ( ) > self . timeout . dur {
728
+ self . updated_at . set ( now) ;
731
729
let msg = format ! ( "failed to download any data for `{}` within {}s" ,
732
730
dl. id,
733
- dl . timeout. dur. as_secs( ) ) ;
731
+ self . timeout. dur. as_secs( ) ) ;
734
732
dl. timed_out . set ( Some ( msg) ) ;
735
733
return false
736
734
}
@@ -739,13 +737,14 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
739
737
// limit, see if we've transferred enough data during this threshold. If
740
738
// it fails this check then we fail because the download is going too
741
739
// slowly.
742
- if now >= dl. next_speed_check . get ( ) {
743
- assert ! ( dl. current. get( ) < dl. next_speed_check_bytes_threshold. get( ) ) ;
740
+ if now >= self . next_speed_check . get ( ) {
741
+ self . next_speed_check . set ( now + self . timeout . dur ) ;
742
+ assert ! ( self . next_speed_check_bytes_threshold. get( ) > 0 ) ;
744
743
let msg = format ! ( "download of `{}` failed to transfer more \
745
744
than {} bytes in {}s",
746
745
dl. id,
747
- dl . timeout. low_speed_limit,
748
- dl . timeout. dur. as_secs( ) ) ;
746
+ self . timeout. low_speed_limit,
747
+ self . timeout. dur. as_secs( ) ) ;
749
748
dl. timed_out . set ( Some ( msg) ) ;
750
749
return false
751
750
}
0 commit comments