Skip to content

Commit

Permalink
make list apps in pre-upgrade and post-upgrade paginated
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed Feb 25, 2025
1 parent 9e995c7 commit b940529
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 33 deletions.
68 changes: 43 additions & 25 deletions cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import io.cdap.cdap.api.annotation.Beta;
import io.cdap.cdap.api.customaction.CustomActionSpecification;
import io.cdap.cdap.api.workflow.ConditionSpecification;
Expand Down Expand Up @@ -275,36 +276,53 @@ public void stopAll(NamespaceId namespace)
throws IOException, UnauthenticatedException, InterruptedException, TimeoutException, UnauthorizedException,
ApplicationNotFoundException, BadRequestException {

List<ApplicationRecord> allApps = applicationClient.list(namespace);
for (ApplicationRecord applicationRecord : allApps) {
ApplicationId appId = new ApplicationId(namespace.getNamespace(), applicationRecord.getName(),
applicationRecord.getAppVersion());
List<ProgramRecord> programRecords = applicationClient.listPrograms(appId);
for (ProgramRecord programRecord : programRecords) {
try {
ProgramId program = appId.program(programRecord.getType(), programRecord.getName());
String status = this.getStatus(program);
if (!status.equals("STOPPED")) {
String token = null;
boolean isLastPage = false;
while (!isLastPage) {
JsonObject paginatedListResponse = applicationClient.paginatedList(namespace, token);
token = paginatedListResponse.get("nextPageToken") == null ? null
: paginatedListResponse.get("nextPageToken").getAsString();
LOG.debug("Called paginated list API to stop programs and got token: {}", token);
if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) {
Type appListType = new TypeToken<List<ApplicationRecord>>() {
}.getType();
List<ApplicationRecord> records = GSON.fromJson(
paginatedListResponse.get("applications").getAsJsonArray(), appListType);
for (ApplicationRecord applicationRecord : records) {
ApplicationId appId = new ApplicationId(namespace.getNamespace(),
applicationRecord.getName(),
applicationRecord.getAppVersion());
List<ProgramRecord> programRecords = applicationClient.listPrograms(appId);
for (ProgramRecord programRecord : programRecords) {
try {
this.stop(program);
} catch (IOException ioe) {
// ProgramClient#stop calls RestClient, which throws an IOException if the HTTP response code is 400,
// which can be due to the program already being stopped when calling stop on it.
// Most likely, there was a race condition that the program stopped between the time we checked its
// status and calling the stop method.
LOG.warn(
"Program {} is already stopped, proceeding even though the following exception is raised.",
program, ioe);
ProgramId program = appId.program(programRecord.getType(), programRecord.getName());
String status = this.getStatus(program);
if (!status.equals("STOPPED")) {
try {
this.stop(program);
} catch (IOException ioe) {
// ProgramClient#stop calls RestClient, which throws an IOException if the
// HTTP response code is 400, which can be due to the program already being
// stopped when calling stop on it.Most likely, there was a race condition that
// the program stopped between the time we checked its status and calling
// the stop method.
LOG.warn(
"Program {} is already stopped, proceeding even though the following exception is raised.",
program, ioe);
}
// YarnTwillController has a timeout of 60 seconds after sending a stop signal
// using ZK. If this fails, it kills the app usin Yarn API. In cases where there
// is a failure to send the message via ZK, it waits for 60 seconds.
// So a wait of 60 seconds here is not enough.
this.waitForStatus(program, ProgramStatus.STOPPED, 120, TimeUnit.SECONDS);
}
} catch (ProgramNotFoundException e) {
// IGNORE
}
// YarnTwillController has a timeout of 60 seconds after sending a stop signal using ZK.
// If this fails, it kills the app usin Yarn API. In cases where there is a failure to send the message
// via ZK, it waits for 60 seconds. So a wait of 60 seconds here is not enough.
this.waitForStatus(program, ProgramStatus.STOPPED, 120, TimeUnit.SECONDS);
}
} catch (ProgramNotFoundException e) {
// IGNORE
}
}
isLastPage = (token == null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package io.cdap.cdap.master.upgrade;

Check warning on line 16 in cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

'package' should be separated from previous line.

import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;

import com.google.gson.JsonObject;

Check warning on line 21 in cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Extra separation in import group before 'com.google.gson.JsonObject'
import io.cdap.cdap.client.ApplicationClient;
import io.cdap.cdap.client.NamespaceClient;
import io.cdap.cdap.client.ProgramClient;
Expand All @@ -33,9 +37,12 @@
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.impersonation.SecurityUtil;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Restarts all schedules and programs stopped between startTimeMillis and now. The first parameter
Expand All @@ -49,6 +56,9 @@
public class PostUpgradeJobMain {

private static final int DEFAULT_READ_TIMEOUT_MILLIS = 90 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(PostUpgradeJobMain.class);
private static final Gson GSON = new Gson();
private static final int APP_LIST_PAGE_SIZE = 25;

public static void main(String[] args) {

Check warning on line 63 in cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
if (args.length < 3 || args.length > 4) {
Expand All @@ -65,7 +75,8 @@ public static void main(String[] args) {
ClientConfig.Builder clientConfigBuilder =
ClientConfig.builder()
.setDefaultReadTimeout(DEFAULT_READ_TIMEOUT_MILLIS)
.setConnectionConfig(connectionConfig);
.setConnectionConfig(connectionConfig)
.setAppListPageSize(APP_LIST_PAGE_SIZE);

// If used in proxy mode, attach a user ID header to upgrade jobs.
CConfiguration cConf = CConfiguration.create();
Expand Down Expand Up @@ -111,12 +122,28 @@ private static void restartPipelinesAndSchedules(
}

for (NamespaceId namespaceId : namespaceIdList) {
for (ApplicationRecord record : applicationClient.list(namespaceId)) {
ApplicationId applicationId =
new ApplicationId(namespaceId.getNamespace(), record.getName(), record.getAppVersion());
programClient.restart(applicationId,
TimeUnit.MILLISECONDS.toSeconds(startTimeMillis),
TimeUnit.MILLISECONDS.toSeconds(endTimeMillis));
String token = null;
boolean isLastPage = false;
while (!isLastPage) {
JsonObject paginatedListResponse = applicationClient.paginatedList(namespaceId, token);
token = paginatedListResponse.get("nextPageToken") == null ? null
: paginatedListResponse.get("nextPageToken").getAsString();
LOG.debug("Called paginated list API to restart programs and got token: {}", token);
if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) {
Type appListType = new TypeToken<List<ApplicationRecord>>() {
}.getType();
List<ApplicationRecord> records = GSON.fromJson(
paginatedListResponse.get("applications").getAsJsonArray(), appListType);
for (ApplicationRecord record : records) {
ApplicationId applicationId =
new ApplicationId(namespaceId.getNamespace(), record.getName(),
record.getAppVersion());
programClient.restart(applicationId,
TimeUnit.MILLISECONDS.toSeconds(startTimeMillis),
TimeUnit.MILLISECONDS.toSeconds(endTimeMillis));
}
}
isLastPage = (token == null);
}

// Re-enable schedules in a namespace AFTER programs have been restarted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig)
JsonObject paginatedListResponse = applicationClient.paginatedList(namespaceId, token);
token = paginatedListResponse.get("nextPageToken") == null ? null
: paginatedListResponse.get("nextPageToken").getAsString();
LOG.debug("Called paginated list API and got token: {}", token);
LOG.debug("Called paginated list API to schedule and workflows and got token: {}", token);
if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) {
Type appListType = new TypeToken<List<ApplicationRecord>>() {
}.getType();
Expand Down

0 comments on commit b940529

Please sign in to comment.