Skip to content

Commit

Permalink
[FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 19, 2024
1 parent a2c3d27 commit 87ed9cc
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,6 @@ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
executionGraphWithVertexParallelism.getExecutionGraph();

executionGraph.start(componentMainThreadExecutor);
executionGraph.transitionToRunning();

executionGraph.setInternalTaskFailuresListener(
new UpdateSchedulerNgOnInternalFailuresListener(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ private void handleExecutionGraphCreation(
operatorCoordinatorHandlerFactory.create(executionGraph, context);
operatorCoordinatorHandler.initializeOperatorCoordinators(
context.getMainThreadExecutor());
operatorCoordinatorHandler.startAllOperatorCoordinators();
final String updatedPlan =
JsonPlanGenerator.generatePlan(
executionGraph.getJobID(),
Expand All @@ -137,6 +136,10 @@ private void handleExecutionGraphCreation(
.iterator(),
executionGraphWithVertexParallelism.getVertexParallelism());
executionGraph.setJsonPlan(updatedPlan);

executionGraph.transitionToRunning();
operatorCoordinatorHandler.startAllOperatorCoordinators();

context.goToExecuting(
result.getExecutionGraph(),
executionGraphHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() {
ignored -> CreatingExecutionGraph.AssignmentResult.notPossible());
context.setExpectWaitingForResources();

executionGraphWithVertexParallelismFuture.complete(
getGraph(new StateTrackingMockExecutionGraph()));
final StateTrackingMockExecutionGraph executionGraph =
new StateTrackingMockExecutionGraph();

executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));

assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING);
}

@Test
Expand Down

0 comments on commit 87ed9cc

Please sign in to comment.