@@ -56,20 +56,22 @@ public override void ExecuteTasks(CommandProcessorContext context)
56
56
{
57
57
PutTasksForExecution ( context ) ;
58
58
59
- while ( context . ProcessingTasks . Count >= batchSize ) {
59
+ var processingTasks = context . ProcessingTasks ;
60
+ while ( processingTasks . Count >= batchSize ) {
60
61
_ = ExecuteBatch ( batchSize , null , context ) ;
61
62
}
62
63
63
- var allowPartialExecution = context . AllowPartialExecution ;
64
- while ( context . ProcessingTasks . Count > 0 ) {
65
- if ( allowPartialExecution ) {
66
- //re-register task
67
- RegisterTask ( context . ProcessingTasks . Dequeue ( ) ) ;
68
- }
69
- else {
70
- _ = context . ProcessingTasks . Count > batchSize
64
+ if ( ! context . AllowPartialExecution ) {
65
+ while ( processingTasks . Count > 0 ) {
66
+ _ = processingTasks . Count > batchSize
71
67
? ExecuteBatch ( batchSize , null , context )
72
- : ExecuteBatch ( context . ProcessingTasks . Count , null , context ) ;
68
+ : ExecuteBatch ( processingTasks . Count , null , context ) ;
69
+ }
70
+ }
71
+ else {
72
+ //re-register task
73
+ for ( int i = 0 , count = processingTasks . Count ; i < count ; i ++ ) {
74
+ tasks . Enqueue ( processingTasks . Dequeue ( ) ) ;
73
75
}
74
76
}
75
77
}
@@ -78,22 +80,23 @@ public override async Task ExecuteTasksAsync(CommandProcessorContext context, Ca
78
80
{
79
81
PutTasksForExecution ( context ) ;
80
82
81
- while ( context . ProcessingTasks . Count >= batchSize ) {
83
+ var processingTasks = context . ProcessingTasks ;
84
+ while ( processingTasks . Count >= batchSize ) {
82
85
_ = await ExecuteBatchAsync ( batchSize , null , context , token ) . ConfigureAwait ( false ) ;
83
86
}
84
87
85
- var allowPartialExecution = context . AllowPartialExecution ;
86
- while ( context . ProcessingTasks . Count > 0 ) {
87
- if ( allowPartialExecution ) {
88
- //re-register task
89
- RegisterTask ( context . ProcessingTasks . Dequeue ( ) ) ;
90
- }
91
- else {
88
+ if ( ! context . AllowPartialExecution ) {
89
+ while ( processingTasks . Count > 0 ) {
92
90
_ = await ( ( context . ProcessingTasks . Count > batchSize )
93
91
? ExecuteBatchAsync ( batchSize , null , context , token )
94
92
: ExecuteBatchAsync ( context . ProcessingTasks . Count , null , context , token ) ) ;
95
93
}
96
94
}
95
+ else {
96
+ for ( int i = 0 , count = processingTasks . Count ; i < count ; i ++ ) {
97
+ tasks . Enqueue ( processingTasks . Dequeue ( ) ) ;
98
+ }
99
+ }
97
100
}
98
101
99
102
public override IEnumerator < Tuple > ExecuteTasksWithReader ( QueryRequest request , CommandProcessorContext context )
0 commit comments