Skip to content
This repository was archived by the owner on Oct 15, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static class YarnCluster {
private final Set<String> resourcesToLocalize;

/**
* JARs that will be localized and put into the classpaths for bot JobManager and TaskManager.
* JARs that will be localized and put into the classpaths for both JobManager and TaskManager.
*/
@JsonProperty("additional.jars")
private final Set<String> additionalJars;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ static StateView computeState(Map<UUID, JobDefinition> jobs, Map<UUID, InstanceI

static JobDefinitionDesiredstate computeActualState(InstanceInfo info) {
JobDefinitionResource r = new JobDefinitionResource()
.memory(info.status().getAllocatedMB())
.memory(info.status().getAllocatedMB() / (info.status().getAllocatedVCores() != 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a closer look I don't quite understand this. The memory should be the total amount of memory for the whole job instead of the memory used by each task manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/uber/AthenaX/blob/9de54305f6dc198cbe176f64c8d6dddc6ab2e6ac/athenax-backend/src/main/java/com/uber/athenax/backend/server/yarn/JobConf.java#L48-L55

According to the definition of JobConf, it seems like the amount of memory specified by the Web API parameters is exactly the memory used by each task manager instead of the whole job's memory consumption. And when I start a job with 2 cores and 2G memory, AthenaX starts a job with two TaskManager containers with 1 Core and 2G memory each, so the total amount of memory retrieved from YARN is 4G, which leads to the same issue of continuously restarting.

Maybe this is not the right place to fix this issue ?

? info.status().getAllocatedVCores() : 1))
.vCores(info.status().getAllocatedVCores());
JobDefinitionDesiredstate s = new JobDefinitionDesiredstate()
.clusterId(info.clusterName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.uber.athenax.vm.compiler.planner.JobCompilationResult;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
Expand Down Expand Up @@ -210,6 +211,9 @@ void scanAll() throws IOException, YarnException {
if (instance == null) {
LOG.warn("Failed to retrieve instance info for {}:{}", cluster.name(), report.getApplicationId());
} else {
instance.status().setAllocatedVCores(instance.status().getAllocatedVCores() - 1);
instance.status().setAllocatedMB(instance.status().getAllocatedMB()
- flinkConf.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
newInstances.put(instance.metadata().uuid(), instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class YarnClusterConfiguration {
private final Set<Path> resourcesToLocalize;

/**
* JARs that will be localized and put into the classpaths for bot JobManager and TaskManager.
* JARs that will be localized and put into the classpaths for both JobManager and TaskManager.
*/
private final Set<Path> systemJars;

Expand Down