Skip to content

Commit 332926e

Browse files
committed
Improve batching command processor
1 parent 59298d5 commit 332926e

File tree

3 files changed

+56
-46
lines changed

3 files changed

+56
-46
lines changed

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/BatchingCommandProcessor.cs

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
// Created by: Denis Krjuchkov
55
// Created: 2009.08.20
66

7-
using System;
87
using System.Collections.Generic;
98
using System.Linq;
109
using System.Threading;
@@ -17,7 +16,7 @@ namespace Xtensive.Orm.Providers
1716
internal sealed class BatchingCommandProcessor : CommandProcessor, ISqlTaskProcessor
1817
{
1918
private readonly int batchSize;
20-
private readonly Queue<SqlTask> tasks;
19+
private Queue<SqlTask> tasks;
2120

2221
void ISqlTaskProcessor.ProcessTask(SqlLoadTask task, CommandProcessorContext context)
2322
{
@@ -61,8 +60,17 @@ public override void ExecuteTasks(CommandProcessorContext context)
6160
_ = ExecuteBatch(batchSize, null, context);
6261
}
6362

64-
while (!context.AllowPartialExecution && context.ProcessingTasks.Count > 0) {
65-
_ = ExecuteBatch(context.ProcessingTasks.Count, null, context);
63+
var allowPartialExecution = context.AllowPartialExecution;
64+
while (context.ProcessingTasks.Count > 0) {
65+
if (allowPartialExecution) {
66+
//re-register task
67+
RegisterTask(context.ProcessingTasks.Dequeue());
68+
}
69+
else {
70+
_ = context.ProcessingTasks.Count > batchSize
71+
? ExecuteBatch(batchSize, null, context)
72+
: ExecuteBatch(context.ProcessingTasks.Count, null, context);
73+
}
6674
}
6775
}
6876

@@ -74,8 +82,17 @@ public override async Task ExecuteTasksAsync(CommandProcessorContext context, Ca
7482
_ = await ExecuteBatchAsync(batchSize, null, context, token).ConfigureAwait(false);
7583
}
7684

77-
while (!context.AllowPartialExecution && context.ProcessingTasks.Count > 0) {
78-
_ = await ExecuteBatchAsync(context.ProcessingTasks.Count, null, context, token).ConfigureAwait(false);
85+
var allowPartialExecution = context.AllowPartialExecution;
86+
while (context.ProcessingTasks.Count > 0) {
87+
if (allowPartialExecution) {
88+
//re-register task
89+
RegisterTask(context.ProcessingTasks.Dequeue());
90+
}
91+
else {
92+
_ = await ((context.ProcessingTasks.Count > batchSize)
93+
? ExecuteBatchAsync(batchSize, null, context, token)
94+
: ExecuteBatchAsync(context.ProcessingTasks.Count, null, context, token));
95+
}
7996
}
8097
}
8198

@@ -88,8 +105,9 @@ public override IEnumerator<Tuple> ExecuteTasksWithReader(QueryRequest request,
88105
_ = ExecuteBatch(batchSize, null, context);
89106
}
90107

91-
for (;;) {
92-
var result = ExecuteBatch(context.ProcessingTasks.Count, request, context);
108+
for (; ; ) {
109+
var currentBatchSize = (context.ProcessingTasks.Count > batchSize) ? batchSize : context.ProcessingTasks.Count;
110+
var result = ExecuteBatch(currentBatchSize, request, context);
93111
if (result != null && context.ProcessingTasks.Count == 0) {
94112
return result.AsReaderOf(request);
95113
}
@@ -106,7 +124,8 @@ public override async Task<IEnumerator<Tuple>> ExecuteTasksWithReaderAsync(Query
106124
}
107125

108126
for (; ; ) {
109-
var result = await ExecuteBatchAsync(context.ProcessingTasks.Count, request, context, token).ConfigureAwait(false);
127+
var currentBatchSize = (context.ProcessingTasks.Count > batchSize) ? batchSize : context.ProcessingTasks.Count;
128+
var result = await ExecuteBatchAsync(currentBatchSize, request, context, token).ConfigureAwait(false);
110129
if (result != null && context.ProcessingTasks.Count == 0) {
111130
return result.AsReaderOf(request);
112131
}
@@ -117,21 +136,20 @@ public override async Task<IEnumerator<Tuple>> ExecuteTasksWithReaderAsync(Query
117136

118137
private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, CommandProcessorContext context)
119138
{
120-
if (numberOfTasks==0 && lastRequest==null) {
139+
if (numberOfTasks == 0 && lastRequest == null) {
121140
return null;
122141
}
123142

124-
var tasksToProcess = context.ProcessingTasks;
125-
126143
AllocateCommand(context);
127144

128145
var shouldReturnReader = false;
146+
var tasksToProcess = context.ProcessingTasks;
129147
try {
130148
while (numberOfTasks > 0 && tasksToProcess.Count > 0) {
131149
var task = tasksToProcess.Peek();
132150
context.CurrentTask = task;
133151
task.ProcessWith(this, context);
134-
if(context.CurrentTask==null) {
152+
if (context.CurrentTask == null) {
135153
numberOfTasks--;
136154
_ = tasksToProcess.Dequeue();
137155
}
@@ -148,15 +166,18 @@ private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, Comman
148166
}
149167
}
150168

151-
if (context.ActiveCommand.Count==0) {
169+
if (context.ActiveCommand.Count == 0) {
152170
return null;
153171
}
172+
154173
var hasQueryTasks = context.ActiveTasks.Count > 0;
174+
155175
if (!hasQueryTasks && !shouldReturnReader) {
156176
_ = context.ActiveCommand.ExecuteNonQuery();
157177
return null;
158178
}
159179
context.ActiveCommand.ExecuteReader();
180+
160181
if (hasQueryTasks) {
161182
var currentQueryTask = 0;
162183
while (currentQueryTask < context.ActiveTasks.Count) {
@@ -182,15 +203,14 @@ private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, Comman
182203

183204
private async Task<Command> ExecuteBatchAsync(int numberOfTasks, QueryRequest lastRequest, CommandProcessorContext context, CancellationToken token)
184205
{
185-
if (numberOfTasks==0 && lastRequest==null) {
206+
if (numberOfTasks == 0 && lastRequest == null) {
186207
return null;
187208
}
188209

189-
var tasksToProcess = context.ProcessingTasks;
190-
191210
AllocateCommand(context);
192211

193212
var shouldReturnReader = false;
213+
var tasksToProcess = context.ProcessingTasks;
194214
try {
195215
while (numberOfTasks > 0 && tasksToProcess.Count > 0) {
196216
var task = tasksToProcess.Peek();
@@ -212,9 +232,10 @@ private async Task<Command> ExecuteBatchAsync(int numberOfTasks, QueryRequest la
212232
}
213233
}
214234

215-
if (context.ActiveCommand.Count==0) {
235+
if (context.ActiveCommand.Count == 0) {
216236
return null;
217237
}
238+
218239
var hasQueryTasks = context.ActiveTasks.Count > 0;
219240
if (!hasQueryTasks && !shouldReturnReader) {
220241
_ = await context.ActiveCommand.ExecuteNonQueryAsync(token).ConfigureAwait(false);
@@ -259,7 +280,7 @@ private void ExecuteUnbatchedTask(SqlPersistTask task)
259280
var sequence = Factory.CreatePersistParts(task);
260281
foreach (var part in sequence) {
261282
using (var command = Factory.CreateCommand()) {
262-
ValidateCommandParameters(part);
283+
ValidateCommandPartParameters(part);
263284
command.AddPart(part);
264285
var affectedRowsCount = command.ExecuteNonQuery();
265286
if (affectedRowsCount == 0) {
@@ -273,19 +294,15 @@ private void ExecuteUnbatchedTask(SqlPersistTask task)
273294
private void PutTasksForExecution(CommandProcessorContext context)
274295
{
275296
if (context.AllowPartialExecution) {
276-
context.ProcessingTasks = new Queue<SqlTask>();
277-
var batchesCount = (int)tasks.Count / batchSize;
278-
if (batchesCount==0) {
279-
return;
280-
}
281-
context.ProcessingTasks = new Queue<SqlTask>();
282-
while (context.ProcessingTasks.Count < batchesCount * batchSize) {
297+
var processingTasksCount = tasks.Count / batchSize * batchSize;
298+
context.ProcessingTasks = new Queue<SqlTask>(processingTasksCount);
299+
while (context.ProcessingTasks.Count < processingTasksCount) {
283300
context.ProcessingTasks.Enqueue(tasks.Dequeue());
284301
}
285302
}
286303
else {
287-
context.ProcessingTasks = new Queue<SqlTask>(tasks);
288-
tasks.Clear();
304+
context.ProcessingTasks = tasks;
305+
tasks = new Queue<SqlTask>(batchSize);
289306
}
290307
}
291308

@@ -304,13 +321,6 @@ private bool PendCommandParts(Command currentCommand, ICollection<CommandPart> p
304321
: true;
305322
}
306323

307-
private void ValidateCommandParameters(CommandPart commandPart)
308-
{
309-
if (GetCommandExecutionBehavior(new[] { commandPart }, 0) == ExecutionBehavior.TooLargeForAnyCommand) {
310-
throw new ParametersLimitExceededException(commandPart.Parameters.Count, MaxQueryParameterCount);
311-
}
312-
}
313-
314324
private static string GetParameterPrefix(CommandProcessorContext context) =>
315325
string.Format("p{0}_", context.ActiveCommand.Count + 1);
316326

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/CommandProcessor.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ protected void ReleaseCommand(CommandProcessorContext context)
117117
}
118118
}
119119

120+
protected void ValidateCommandPartParameters(CommandPart commandPart)
121+
{
122+
if (commandPart.Parameters.Count > MaxQueryParameterCount) {
123+
throw new ParametersLimitExceededException(commandPart.Parameters.Count, MaxQueryParameterCount);
124+
}
125+
}
126+
120127
protected ExecutionBehavior GetCommandExecutionBehavior(ICollection<CommandPart> commandParts, int currentParametersCount)
121128
{
122129
if (MaxQueryParameterCount == int.MaxValue) {

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/SimpleCommandProcessor.cs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ internal sealed class SimpleCommandProcessor : CommandProcessor, ISqlTaskProcess
2020
void ISqlTaskProcessor.ProcessTask(SqlLoadTask task, CommandProcessorContext context)
2121
{
2222
var part = Factory.CreateQueryPart(task);
23-
ValidateCommandParameters(part);
23+
ValidateCommandPartParameters(part);
2424
context.ActiveCommand.AddPart(part);
2525
context.ActiveTasks.Add(task);
2626
}
@@ -30,7 +30,7 @@ void ISqlTaskProcessor.ProcessTask(SqlPersistTask task, CommandProcessorContext
3030
var sequence = Factory.CreatePersistParts(task);
3131
foreach (var part in sequence) {
3232
try {
33-
ValidateCommandParameters(part);
33+
ValidateCommandPartParameters(part);
3434
context.ActiveCommand.AddPart(part);
3535
var affectedRowsCount = context.ActiveCommand.ExecuteNonQuery();
3636
if (task.ValidateRowCount && affectedRowsCount == 0) {
@@ -116,7 +116,7 @@ public override IEnumerator<Tuple> ExecuteTasksWithReader(QueryRequest lastReque
116116

117117
var lastRequestCommand = Factory.CreateCommand();
118118
var commandPart = Factory.CreateQueryPart(lastRequest);
119-
ValidateCommandParameters(commandPart);
119+
ValidateCommandPartParameters(commandPart);
120120
lastRequestCommand.AddPart(commandPart);
121121
lastRequestCommand.ExecuteReader();
122122
return lastRequestCommand.AsReaderOf(lastRequest);
@@ -134,20 +134,13 @@ public override async Task<IEnumerator<Tuple>> ExecuteTasksWithReaderAsync(Query
134134

135135
var lastRequestCommand = Factory.CreateCommand();
136136
var commandPart = Factory.CreateQueryPart(lastRequest);
137-
ValidateCommandParameters(commandPart);
137+
ValidateCommandPartParameters(commandPart);
138138
lastRequestCommand.AddPart(commandPart);
139139
token.ThrowIfCancellationRequested();
140140
await lastRequestCommand.ExecuteReaderAsync(token).ConfigureAwait(false);
141141
return lastRequestCommand.AsReaderOf(lastRequest);
142142
}
143143

144-
private void ValidateCommandParameters(CommandPart commandPart)
145-
{
146-
if (GetCommandExecutionBehavior(new[] { commandPart }, 0) == ExecutionBehavior.TooLargeForAnyCommand) {
147-
throw new ParametersLimitExceededException(commandPart.Parameters.Count, MaxQueryParameterCount);
148-
}
149-
}
150-
151144
// Constructors
152145

153146
public SimpleCommandProcessor(CommandFactory factory, int maxQueryParameterCount)

0 commit comments

Comments
 (0)