Skip to content

Commit

Permalink
robust fix for connection and worker (#921)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingluoHuang authored Apr 14, 2017
1 parent f492dce commit d85cfab
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 14 deletions.
49 changes: 46 additions & 3 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ private async Task RunAsync(AgentJobRequestMessage message, WorkerDispatcher pre
}

Task<int> workerProcessTask = null;
object _outputLock = new object();
List<string> workerOutput = new List<string>();
using (var processChannel = HostContext.CreateService<IProcessChannel>())
using (var processInvoker = HostContext.CreateService<IProcessInvoker>())
{
Expand All @@ -318,6 +320,30 @@ private async Task RunAsync(AgentJobRequestMessage message, WorkerDispatcher pre
ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut));
ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn));

// Save STDOUT from worker, worker will use STDOUT report unhandle exception.
processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout)
{
if (!string.IsNullOrEmpty(stdout.Data))
{
lock (_outputLock)
{
workerOutput.Add(stdout.Data);
}
}
};

// Save STDERR from worker, worker will use STDERR on crash.
processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr)
{
if (!string.IsNullOrEmpty(stderr.Data))
{
lock (_outputLock)
{
workerOutput.Add(stderr.Data);
}
}
};

// Start the child process.
var assemblyDirectory = IOUtil.GetBinPath();
string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName);
Expand Down Expand Up @@ -382,10 +408,17 @@ await processChannel.SendAsync(
var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken));
if (completedTask == workerProcessTask)
{
// worker finished successfully, complete job request with result, stop renew lock, job has finished.
// worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished.
int returnCode = await workerProcessTask;
Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode);

string detailInfo = null;
if (!TaskResultUtil.IsValidReturnCode(returnCode))
{
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandle exception or app crash, attach worker stdout/stderr to JobRequest result.");
}

TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Trace.Info($"finish job request for job {message.JobId} with result: {result}");
term.WriteLine(StringUtil.Loc("JobCompleted", DateTime.UtcNow, message.JobName, result));
Expand All @@ -397,7 +430,16 @@ await processChannel.SendAsync(
await renewJobRequest;

// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result);
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

// print out unhandle exception happened in worker after we complete job request.
// when we run out of disk space, report back to server has higher prority.
if (!string.IsNullOrEmpty(detailInfo))
{
Trace.Error("Unhandle exception happened in worker:");
Trace.Error(detailInfo);
}

return;
}
else if (completedTask == renewJobRequest)
Expand Down Expand Up @@ -565,7 +607,8 @@ private async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockTok
}
}

private async Task CompleteJobRequestAsync(int poolId, AgentJobRequestMessage message, Guid lockToken, TaskResult result)
// TODO: We need send detailInfo back to DT in order to add an issue for the job
private async Task CompleteJobRequestAsync(int poolId, AgentJobRequestMessage message, Guid lockToken, TaskResult result, string detailInfo = null)
{
Trace.Entering();
if (ApiUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent))
Expand Down
13 changes: 12 additions & 1 deletion src/Agent.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,18 @@ public static async Task<int> MainAsync(
}
catch (Exception ex)
{
trace.Error(ex);
// Populate any exception that cause worker failure back to agent.
Console.WriteLine(ex.ToString());
try
{
trace.Error(ex);
}
catch (Exception e)
{
// make sure we don't crash the app on trace error.
// since IOException will throw when we run out of disk space.
Console.WriteLine(e.ToString());
}
}
finally
{
Expand Down
16 changes: 14 additions & 2 deletions src/Microsoft.VisualStudio.Services.Agent/AgentServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,21 @@ public sealed class AgentServer : AgentService, IAgentServer
public async Task ConnectAsync(VssConnection agentConnection)
{
_connection = agentConnection;
if (!_connection.HasAuthenticated)
int attemptCount = 5;
while (!_connection.HasAuthenticated && attemptCount-- > 0)
{
await _connection.ConnectAsync();
try
{
await _connection.ConnectAsync();
break;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attemp left.");
Trace.Error(ex);
}

await Task.Delay(100);
}

_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
Expand Down
9 changes: 7 additions & 2 deletions src/Microsoft.VisualStudio.Services.Agent/JobNotification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,13 @@ private void Dispose(bool disposing)
if (disposing)
{
_outClient?.Dispose();
_socket.Send(Encoding.UTF8.GetBytes("<EOF>"));
_socket.Shutdown(SocketShutdown.Both);

if (_socket != null)
{
_socket.Send(Encoding.UTF8.GetBytes("<EOF>"));
_socket.Shutdown(SocketShutdown.Both);
_socket = null;
}
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions src/Microsoft.VisualStudio.Services.Agent/JobServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ public sealed class JobServer : AgentService, IJobServer
public async Task ConnectAsync(VssConnection jobConnection)
{
_connection = jobConnection;

if (!_connection.HasAuthenticated)
int attemptCount = 5;
while (!_connection.HasAuthenticated && attemptCount-- > 0)
{
await _connection.ConnectAsync();
try
{
await _connection.ConnectAsync();
break;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attemp left.");
Trace.Error(ex);
}

await Task.Delay(100);
}

_taskClient = _connection.GetClient<TaskHttpClient>();
Expand Down
17 changes: 14 additions & 3 deletions src/Microsoft.VisualStudio.Services.Agent/TaskServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,21 @@ public sealed class TaskServer : AgentService, ITaskServer
public async Task ConnectAsync(VssConnection jobConnection)
{
_connection = jobConnection;

if (!_connection.HasAuthenticated)
int attemptCount = 5;
while (!_connection.HasAuthenticated && attemptCount-- > 0)
{
await _connection.ConnectAsync();
try
{
await _connection.ConnectAsync();
break;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attemp left.");
Trace.Error(ex);
}

await Task.Delay(100);
}

_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ public static class TaskResultUtil
{
private static readonly int _returnCodeOffset = 100;

public static bool IsValidReturnCode(int returnCode)
{
int resultInt = returnCode - _returnCodeOffset;
return Enum.IsDefined(typeof(TaskResult), resultInt);
}

public static int TranslateToReturnCode(TaskResult result)
{
return _returnCodeOffset + (int)result;
Expand Down

0 comments on commit d85cfab

Please sign in to comment.