diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml index a7f2c654..f3c1bac4 100644 --- a/.github/workflows/actions.yml +++ b/.github/workflows/actions.yml @@ -6,7 +6,7 @@ on: branches: [ master ] jobs: unit-tests: - runs-on: ubuntu-18.04 + runs-on: ubuntu-20.04 defaults: run: working-directory: go/src/github.com/lyft/flinkk8soperator @@ -27,7 +27,7 @@ jobs: - name: test run: make test_unit lint: - runs-on: ubuntu-18.04 + runs-on: ubuntu-20.04 defaults: run: working-directory: go/src/github.com/lyft/flinkk8soperator @@ -47,27 +47,26 @@ jobs: run: make install - name: test run: make lint - # TODO: restore this test - # integration-tests: - # runs-on: ubuntu-18.04 - # defaults: - # run: - # working-directory: go/src/github.com/lyft/flinkk8soperator - # env: - # GOPATH: "/home/runner/work/flinkk8soperator/flinkk8soperator/go/" - # steps: - # - name: checkout - # uses: actions/checkout@v2 - # with: - # fetch-depth: 1 - # path: go/src/github.com/lyft/flinkk8soperator - # - name: install go - # uses: actions/setup-go@v2 - # with: - # go-version: 1.12 - # - name: install - # run: integ/install.sh - # - name: setup - # run: integ/setup.sh - # - name: test - # run: sudo "PATH=$PATH" "GOPATH=$GOPATH" integ/test.sh + integration-tests: + runs-on: ubuntu-20.04 + defaults: + run: + working-directory: go/src/github.com/lyft/flinkk8soperator + env: + GOPATH: "/home/runner/work/flinkk8soperator/flinkk8soperator/go/" + steps: + - name: checkout + uses: actions/checkout@v2 + with: + fetch-depth: 1 + path: go/src/github.com/lyft/flinkk8soperator + - name: install go + uses: actions/setup-go@v2 + with: + go-version: 1.12 + - name: install + run: integ/install.sh + - name: setup + run: integ/setup.sh + - name: test + run: sudo "PATH=$PATH" "GOPATH=$GOPATH" integ/test.sh diff --git a/docs/local_dev.md b/docs/local_dev.md index 50d1cbbb..31325894 100644 --- a/docs/local_dev.md +++ b/docs/local_dev.md @@ -7,11 +7,11 @@ to develop their applications locally. ## Run the operator -### Install [Docker for Mac](https://docs.docker.com/docker-for-mac/install/) +### Install [Minikube](https://minikube.sigs.k8s.io/docs/start/#what-youll-need) + +You will want to start minikube on <=1.20, for example: +`minikube start --kubernetes-version=v1.20.15` -Once installed and running, enabled Kuberenetes in settings (from the -docker icon in the menu bar, click Preferences -> Kubernetes -> Enable -Kubernetes). ### (Optional) Setup kubernetes dashboard @@ -46,6 +46,12 @@ $ cd flinkk8soperator $ kubectl create -f deploy/crd.yaml ``` +### Install permissions +``` bash +$ kubectl create -f deploy/role.yaml +$ kubectl create -f deploy/role-binding.yaml +``` + ### Start the operator #### Option 1: run outside the kubernetes cluster diff --git a/integ/README.md b/integ/README.md index 8f5ab0e9..9415e5d6 100644 --- a/integ/README.md +++ b/integ/README.md @@ -79,3 +79,40 @@ variables. Supported options include: You can also pass [gocheck](http://labix.org/gocheck) options to the test runner. Particularly useful is `-check.vv` which will output logs from the operator and Flink pods to help debugging test failures. + +### Minikube Setup + +Ideally we'd use k8s 1.16 to match the deployed k8s version, however, this +is non-trivial due to cgroup configurations. Instead, we will use a version +that is compatible with v1beta1 CRD's which corresponds to <1.22. CRD's v1 +is only available with client >=1.16, however, the client used here is 1.14 +and the upgrade is non-trivial. + + +1. Install Dependencies + Run dep ensure -vendor-only + +3. Start minikube + minikube start --kubernetes-version=v1.20.15 + +4. Proxy minikube + kubectl proxy --port 8001 & + +5. Set up test app images and operator image + integ/setup.sh + +8. Set the following for the Go test: + Package path: github.com/lyft/flinkk8soperator/integ + Env: INTEGRATION=true;OPERATOR_IMAGE=flinkk8soperator:local;RUN_DIRECT=true + Program Args: -timeout 40m -check.vv IntegTest + + +Helpers: +- Kill kube proxy + ps -ef | grep "kubectl proxy" + kill -9 +- Kill stuck flink app + kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge +- Set default namespace + kubectl config set-context --current --namespace=flinkoperatortest + diff --git a/integ/blue_green_deployment_test.go b/integ/blue_green_deployment_test.go index 6b51e2ed..c29855a2 100644 --- a/integ/blue_green_deployment_test.go +++ b/integ/blue_green_deployment_test.go @@ -33,6 +33,7 @@ func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1. } func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) { + log.Info("Starting test TestUpdateWithBlueGreenDeploymentMode") testName := "bluegreenupdate" const finalizer = "bluegreen.finalizers.test.com" @@ -55,7 +56,7 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) { pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). List(v1.ListOptions{LabelSelector: "integTest=" + testName}) c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) + c.Assert(len(pods.Items), Equals, 2) for _, pod := range pods.Items { c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) } @@ -72,7 +73,7 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) { List(v1.ListOptions{LabelSelector: "integTest=" + testName}) c.Assert(err, IsNil) // We have 2 applications running - c.Assert(len(pods.Items), Equals, 6) + c.Assert(len(pods.Items), Equals, 4) c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) c.Assert(s.Util.GetJobID(newApp), NotNil) c.Assert(newApp.Status.UpdatingVersion, Equals, v1beta1.BlueFlinkApplication) @@ -153,4 +154,5 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) { } } log.Info("All pods torn down") + log.Info("Completed test TestUpdateWithBlueGreenDeploymentMode") } diff --git a/integ/checkpoint_failure_test.go b/integ/checkpoint_failure_test.go index 9f7a596c..55b662a5 100644 --- a/integ/checkpoint_failure_test.go +++ b/integ/checkpoint_failure_test.go @@ -2,8 +2,6 @@ package integ import ( "fmt" - "io/ioutil" - "os" "time" "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" @@ -67,18 +65,24 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) { // Tests that we correctly handle updating a job with task failures func (s *IntegSuite) TestJobWithTaskFailures(c *C) { + log.Info("Starting test TestJobWithTaskFailures") + failingJobTest(s, c, "taskfailure", func() { - f, err := os.OpenFile(s.Util.CheckpointDir+"/fail", os.O_RDONLY|os.O_CREATE, 0666) + err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail") c.Assert(err, IsNil) - c.Assert(f.Close(), IsNil) }) + log.Info("Completed test TestJobWithTaskFailures") } // Tests that we correctly handle updating a job with a checkpoint timeout func (s *IntegSuite) TestCheckpointTimeout(c *C) { + log.Info("Starting test TestCheckpointTimeout") + failingJobTest(s, c, "checkpointtimeout", func() { // cause checkpoints to take 120 seconds - err := ioutil.WriteFile(s.Util.CheckpointDir+"/checkpoint_delay", []byte("120000"), 0644) + err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay") c.Assert(err, IsNil) }) + log.Info("Completed test TestCheckpointTimeout") + } diff --git a/integ/install.sh b/integ/install.sh index b63beff5..82536261 100755 --- a/integ/install.sh +++ b/integ/install.sh @@ -2,10 +2,13 @@ set -e -sudo snap install microk8s --classic --channel=1.13/stable -microk8s.status --wait-ready -microk8s.enable dns -microk8s.enable registry +curl -LO -s https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 +sudo install minikube-linux-amd64 /usr/local/bin/minikube + +minikube config set memory 6800 +minikube start --kubernetes-version=v1.20.15 + +export KUBERNETES_CONFIG=~/.kube/config sh boilerplate/lyft/golang_test_targets/dep_install.sh diff --git a/integ/job_cancellation_test.go b/integ/job_cancellation_test.go index a1a229b1..3daba8a7 100644 --- a/integ/job_cancellation_test.go +++ b/integ/job_cancellation_test.go @@ -58,7 +58,7 @@ func WaitUpdateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app * // tests the workflow of job cancellation without savepoint func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) { - + log.Info("Starting test TestJobCancellationWithoutSavepoint") testName := "cancelsuccess" const finalizer = "simple.finalizers.test.com" @@ -81,7 +81,7 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) { pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). List(v1.ListOptions{LabelSelector: "integTest=" + testName}) c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) + c.Assert(len(pods.Items), Equals, 2) for _, pod := range pods.Items { c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) } @@ -97,7 +97,7 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) { pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). List(v1.ListOptions{LabelSelector: "integTest=" + testName}) c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) + c.Assert(len(pods.Items), Equals, 2) for _, pod := range pods.Items { c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage) } @@ -131,11 +131,13 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) { } } log.Info("All pods torn down") + log.Info("Completed test TestJobCancellationWithoutSavepoint") } // tests a job update with the existing job already in cancelled state. // here, the new submitted job starts without a savepoint. func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) { + log.Info("Starting test TestCancelledJobWithoutSavepoint") testName := "invalidcancel" config, err := s.Util.ReadFlinkApplication("test_app.yaml") @@ -150,6 +152,7 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) { Commentf("Failed to create flink application")) c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) currApp, _ := s.Util.GetFlinkApplication(config.Name) @@ -163,7 +166,7 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) { c.Assert(err, IsNil) // wait a bit - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) job = s.Util.GetJobOverview(currApp) c.Assert(job["status"], Equals, "CANCELED") @@ -205,10 +208,12 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) { } } log.Info("All pods torn down") + log.Info("Completed test TestCancelledJobWithoutSavepoint") } // tests the recovery workflow of the job when savepoint is disabled. func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) { + log.Info("Starting test TestJobRecoveryWithoutSavepoint") const finalizer = "simple.finalizers.test.com" const testName = "cancelrecovery" @@ -300,4 +305,5 @@ func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) { time.Sleep(100 * time.Millisecond) } log.Info("All pods torn down") + log.Info("Completed test TestJobRecoveryWithoutSavepoint") } diff --git a/integ/main_test.go b/integ/main_test.go index aa6a57f6..4a3ce8c9 100644 --- a/integ/main_test.go +++ b/integ/main_test.go @@ -51,6 +51,7 @@ func (s *IntegSuite) SetUpSuite(c *C) { } kubeconfig := os.Getenv("KUBERNETES_CONFIG") + fmt.Printf("Kube config: %s", kubeconfig) if kubeconfig == "" { kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config") err := os.Setenv("KUBERNETES_CONFIG", kubeconfig) @@ -79,7 +80,7 @@ func (s *IntegSuite) SetUpSuite(c *C) { LimitNamespace: namespace, UseProxy: true, ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second}, - MaxErrDuration: flyteConfig.Duration{Duration: 30 * time.Second}, + MaxErrDuration: flyteConfig.Duration{Duration: 60 * time.Second}, MetricsPrefix: "flinkk8soperator", ProxyPort: flyteConfig.Port{Port: 8001}, } @@ -92,6 +93,18 @@ func (s *IntegSuite) SetUpSuite(c *C) { } }() } else { + if err = s.Util.CreateClusterRole(); err != nil && !k8sErrors.IsAlreadyExists(err) { + c.Fatalf("Failed to create role: %v", err) + } + + if err = s.Util.CreateServiceAccount(); err != nil && !k8sErrors.IsAlreadyExists(err) { + c.Fatalf("Failed to create service account: %v", err) + } + + if err = s.Util.CreateClusterRoleBinding(); err != nil && !k8sErrors.IsAlreadyExists(err) { + c.Fatalf("Failed to create cluster role binding: %v", err) + } + if err = s.Util.CreateOperator(); err != nil { c.Fatalf("Failed to create operator: %v", err) } @@ -111,18 +124,12 @@ func (s *IntegSuite) TearDownSuite(c *C) { func (s *IntegSuite) SetUpTest(c *C) { // create checkpoint directory - if _, err := os.Stat(s.Util.CheckpointDir); os.IsNotExist(err) { - c.Assert(os.Mkdir(s.Util.CheckpointDir, 0777), IsNil) + if err := s.Util.ExecuteCommand("minikube", "ssh", "sudo mkdir /tmp/checkpoints && sudo chmod -R 0777 /tmp/checkpoints"); err != nil { + c.Fatalf("Failed to create checkpoint directory: %v", err) } } func (s *IntegSuite) TearDownTest(c *C) { - jm, err := s.Util.GetJobManagerPod() - if err == nil { - fmt.Printf("\n\n######### JobManager logs for debugging #########\n---------------------------\n") - _ = s.Util.GetLogs(jm, nil) - } - tms, err := s.Util.GetTaskManagerPods() if err == nil { for i, tm := range tms { @@ -132,13 +139,34 @@ func (s *IntegSuite) TearDownTest(c *C) { } } + jm, err := s.Util.GetJobManagerPod() + if err == nil { + fmt.Printf("\n\n######### JobManager logs for debugging #########\n---------------------------\n") + _ = s.Util.GetLogs(jm, nil) + } + + fmt.Printf("\n\n######### Nodes for debugging #########\n---------------------------\n") + err = s.Util.ExecuteCommand("kubectl", "describe", "nodes") + c.Assert(err, IsNil) + + fmt.Printf("\n\n######### Pods for debugging #########\n---------------------------\n") + err = s.Util.ExecuteCommand("kubectl", "get", "pods", "-n", "flinkoperatortest") + c.Assert(err, IsNil) + + fmt.Printf("\n\n######### Pod details for debugging #########\n---------------------------\n") + err = s.Util.ExecuteCommand("kubectl", "describe", "pods", "-n", "flinkoperatortest") + c.Assert(err, IsNil) + + fmt.Printf("\n\n######### Flink Applications for debugging #########\n---------------------------\n") + err = s.Util.ExecuteCommand("kubectl", "describe", "flinkapplications", "-n", "flinkoperatortest") + c.Assert(err, IsNil) + err = s.Util.FlinkApps().DeleteCollection(nil, v1.ListOptions{}) if err != nil { - log.Fatalf("Failed to clean up flink applications") + log.Fatalf("Failed to clean up flink applications: %v", err) } - err = os.RemoveAll(s.Util.CheckpointDir) - if err != nil { - log.Fatalf("Failed to clean up checkpoints directory: %v", err) + if err := s.Util.ExecuteCommand("minikube", "ssh", "sudo rm -rf /tmp/checkpoints"); err != nil { + c.Fatalf("Failed to delete checkpoint directory: %v", err) } } diff --git a/integ/operator-test-app/Dockerfile b/integ/operator-test-app/Dockerfile index 6a7fc27e..e42031b9 100644 --- a/integ/operator-test-app/Dockerfile +++ b/integ/operator-test-app/Dockerfile @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH COPY . /code # Configure Flink version -ENV FLINK_VERSION=1.8.1 \ +ENV FLINK_VERSION=1.11.6 \ HADOOP_SCALA_VARIANT=scala_2.12 # Install dependencies @@ -51,7 +51,7 @@ RUN groupadd --system --gid=9999 flink && \ WORKDIR $FLINK_HOME ENV FLINK_URL_FILE_PATH=flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-${HADOOP_SCALA_VARIANT}.tgz -ENV FLINK_TGZ_URL=https://mirrors.ocf.berkeley.edu/apache/$FLINK_URL_FILE_PATH +ENV FLINK_TGZ_URL=https://archive.apache.org/dist/$FLINK_URL_FILE_PATH # Install Flink RUN set -ex; \ diff --git a/integ/operator-test-app/pom.xml b/integ/operator-test-app/pom.xml index 54ed3ea9..be20aac8 100644 --- a/integ/operator-test-app/pom.xml +++ b/integ/operator-test-app/pom.xml @@ -19,12 +19,12 @@ org.apache.flink flink-java - 1.8.1 + 1.11.6 org.apache.flink flink-streaming-java_2.11 - 1.8.1 + 1.11.6 diff --git a/integ/scaleup_test.go b/integ/scaleup_test.go index cc29acff..cdeab3ed 100644 --- a/integ/scaleup_test.go +++ b/integ/scaleup_test.go @@ -1,151 +1,145 @@ package integ -import ( - "fmt" - "time" - - "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" - "github.com/prometheus/common/log" - . "gopkg.in/check.v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func (s *IntegSuite) TestInPlaceScaleUp(c *C) { - const finalizer = "scaleup.finalizers.test.com" - const testName = "test_in_place_scale_up" - - // start a simple app - config, err := s.Util.ReadFlinkApplication("test_app.yaml") - c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) - - config.Spec.ScaleMode = "InPlace" - config.Spec.Parallelism = 2 - config.ObjectMeta.Name = "inplace" - config.ObjectMeta.Labels["integTest"] = testName - // add a finalizer so that the flinkapplication won't be deleted until we've had a chance to look at it - config.Finalizers = append(config.Finalizers, finalizer) - - c.Assert(s.Util.CreateFlinkApplication(config), IsNil, - Commentf("Failed to create flink application")) - - c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) - c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) - - pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). - List(v1.ListOptions{LabelSelector: "integTest=" + testName}) - c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 2) - for _, pod := range pods.Items { - c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) - } - - deployments, err := s.Util.KubeClient.AppsV1().Deployments(s.Util.Namespace.Name). - List(v1.ListOptions{LabelSelector: "flink-app=inplace,flink-deployment-type=taskmanager"}) - c.Assert(err, IsNil) - c.Assert(len(deployments.Items), Equals, 1) - deployment := deployments.Items[0] - - log.Info("Application started successfully") - - // test updating the app with a new scale - _, err = s.Util.Update("inplace", func(app *v1beta1.FlinkApplication) { - app.Spec.Parallelism = 4 - }) - c.Assert(err, IsNil) - - c.Assert(s.Util.WaitForPhase("inplace", v1beta1.FlinkApplicationRescaling, v1beta1.FlinkApplicationDeployFailed), IsNil) - c.Assert(s.Util.WaitForPhase("inplace", v1beta1.FlinkApplicationSavepointing, v1beta1.FlinkApplicationDeployFailed), IsNil) - c.Assert(s.Util.WaitForPhase("inplace", v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) - c.Assert(s.Util.WaitForAllTasksRunning("inplace"), IsNil) - - log.Info("Rescaled job started successfully") - newApp, err := s.Util.GetFlinkApplication(config.Name) - c.Assert(err, IsNil) - - // check that we savepointed and restored correctly - endpoint := fmt.Sprintf("jobs/%s/checkpoints", newApp.Status.JobStatus.JobID) - res, err := s.Util.FlinkAPIGet(newApp, endpoint) - c.Assert(err, IsNil) - - body := res.(map[string]interface{}) - restored := (body["latest"].(map[string]interface{}))["restored"] - c.Assert(restored, NotNil) - - c.Assert(restored.(map[string]interface{})["is_savepoint"], Equals, true) - - // check that we have the correct number of total pods - pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). - List(v1.ListOptions{LabelSelector: "integTest=" + testName}) - c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) - - // check that we are still using the same deploymnet - deployments2, err := s.Util.KubeClient.AppsV1().Deployments(s.Util.Namespace.Name). - List(v1.ListOptions{LabelSelector: "flink-app=inplace,flink-deployment-type=taskmanager"}) - c.Assert(err, IsNil) - c.Assert(len(deployments2.Items), Equals, 1) - deployment2 := deployments.Items[0] - c.Assert(deployment2.Name, Equals, deployment.Name) - - // ensure that we can now proceed to a normal deployment - newApp = updateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { - app.Spec.Image = NewImage - }, v1beta1.FlinkApplicationDeployFailed) - c.Assert(newApp.Spec.Image, Equals, NewImage) - pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). - List(v1.ListOptions{LabelSelector: "integTest=" + testName}) - c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) - for _, pod := range pods.Items { - c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage) - } - - // delete the application and ensure everything is cleaned up successfully - c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil) - - // validate that a savepoint was taken and the job was cancelled - var app *v1beta1.FlinkApplication - for { - app, err = s.Util.GetFlinkApplication(config.Name) - c.Assert(err, IsNil) - - if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer { - break - } - time.Sleep(100 * time.Millisecond) - } - - c.Assert(app.Status.SavepointPath, NotNil) - job := func() map[string]interface{} { - jobs, _ := s.Util.FlinkAPIGet(app, "/jobs") - jobMap := jobs.(map[string]interface{}) - jobList := jobMap["jobs"].([]interface{}) - for _, j := range jobList { - job := j.(map[string]interface{}) - if job["id"] == app.Status.JobStatus.JobID { - return job - } - } - return nil - }() - - fmt.Printf("test job = %v", job) - c.Assert(job["status"], Equals, "CANCELED") - - // delete our finalizer - app.Finalizers = []string{} - _, err = s.Util.FlinkApps().Update(app) - c.Assert(err, IsNil) - - // wait until all pods are gone - for { - pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). - List(v1.ListOptions{LabelSelector: "integTest=" + testName}) - c.Assert(err, IsNil) - if len(pods.Items) == 0 { - break - } - time.Sleep(100 * time.Millisecond) - } - log.Info("All pods torn down") -} +// TODO: https://github.com/lyft/flinkk8soperator/issues/278 +//func (s *IntegSuite) TestInPlaceScaleUp(c *C) { +// log.Info("Starting test TestInPlaceScaleUp") +// +// const finalizer = "scaleup.finalizers.test.com" +// const testName = "test_in_place_scale_up" +// +// // start a simple app +// config, err := s.Util.ReadFlinkApplication("test_app.yaml") +// c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) +// +// config.Spec.ScaleMode = "InPlace" +// config.Spec.Parallelism = 2 +// config.ObjectMeta.Name = "inplace" +// config.ObjectMeta.Labels["integTest"] = testName +// // add a finalizer so that the flinkapplication won't be deleted until we've had a chance to look at it +// config.Finalizers = append(config.Finalizers, finalizer) +// +// c.Assert(s.Util.CreateFlinkApplication(config), IsNil, +// Commentf("Failed to create flink application")) +// +// c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) +// c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) +// +// pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). +// List(v1.ListOptions{LabelSelector: "integTest=" + testName}) +// c.Assert(err, IsNil) +// c.Assert(len(pods.Items), Equals, 2) +// for _, pod := range pods.Items { +// c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) +// } +// +// deployments, err := s.Util.KubeClient.AppsV1().Deployments(s.Util.Namespace.Name). +// List(v1.ListOptions{LabelSelector: "flink-app=inplace,flink-deployment-type=taskmanager"}) +// c.Assert(err, IsNil) +// c.Assert(len(deployments.Items), Equals, 1) +// deployment := deployments.Items[0] +// +// log.Info("Application started successfully") +// +// // test updating the app with a new scale +// _, err = s.Util.Update("inplace", func(app *v1beta1.FlinkApplication) { +// app.Spec.Parallelism = 4 +// }) +// c.Assert(err, IsNil) +// +// c.Assert(s.Util.WaitForPhase("inplace", v1beta1.FlinkApplicationRescaling, v1beta1.FlinkApplicationDeployFailed), IsNil) +// c.Assert(s.Util.WaitForPhase("inplace", v1beta1.FlinkApplicationSavepointing, v1beta1.FlinkApplicationDeployFailed), IsNil) +// c.Assert(s.Util.WaitForPhase("inplace", v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) +// c.Assert(s.Util.WaitForAllTasksRunning("inplace"), IsNil) +// +// log.Info("Rescaled job started successfully") +// newApp, err := s.Util.GetFlinkApplication(config.Name) +// c.Assert(err, IsNil) +// +// // check that we savepointed and restored correctly +// endpoint := fmt.Sprintf("jobs/%s/checkpoints", newApp.Status.JobStatus.JobID) +// res, err := s.Util.FlinkAPIGet(newApp, endpoint) +// c.Assert(err, IsNil) +// +// body := res.(map[string]interface{}) +// restored := (body["latest"].(map[string]interface{}))["restored"] +// c.Assert(restored, NotNil) +// +// c.Assert(restored.(map[string]interface{})["is_savepoint"], Equals, true) +// +// // check that we have the correct number of total pods +// pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). +// List(v1.ListOptions{LabelSelector: "integTest=" + testName}) +// c.Assert(err, IsNil) +// c.Assert(len(pods.Items), Equals, 3) +// +// // check that we are still using the same deploymnet +// deployments2, err := s.Util.KubeClient.AppsV1().Deployments(s.Util.Namespace.Name). +// List(v1.ListOptions{LabelSelector: "flink-app=inplace,flink-deployment-type=taskmanager"}) +// c.Assert(err, IsNil) +// c.Assert(len(deployments2.Items), Equals, 1) +// deployment2 := deployments.Items[0] +// c.Assert(deployment2.Name, Equals, deployment.Name) +// +// // ensure that we can now proceed to a normal deployment +// newApp = updateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { +// app.Spec.Image = NewImage +// }, v1beta1.FlinkApplicationDeployFailed) +// c.Assert(newApp.Spec.Image, Equals, NewImage) +// pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). +// List(v1.ListOptions{LabelSelector: "integTest=" + testName}) +// c.Assert(err, IsNil) +// c.Assert(len(pods.Items), Equals, 3) +// for _, pod := range pods.Items { +// c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage) +// } +// +// // delete the application and ensure everything is cleaned up successfully +// c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil) +// +// // validate that a savepoint was taken and the job was cancelled +// var app *v1beta1.FlinkApplication +// for { +// app, err = s.Util.GetFlinkApplication(config.Name) +// c.Assert(err, IsNil) +// +// if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer { +// break +// } +// time.Sleep(100 * time.Millisecond) +// } +// +// c.Assert(app.Status.SavepointPath, NotNil) +// job := func() map[string]interface{} { +// jobs, _ := s.Util.FlinkAPIGet(app, "/jobs") +// jobMap := jobs.(map[string]interface{}) +// jobList := jobMap["jobs"].([]interface{}) +// for _, j := range jobList { +// job := j.(map[string]interface{}) +// if job["id"] == app.Status.JobStatus.JobID { +// return job +// } +// } +// return nil +// }() +// +// fmt.Printf("test job = %v", job) +// c.Assert(job["status"], Equals, "CANCELED") +// +// // delete our finalizer +// app.Finalizers = []string{} +// _, err = s.Util.FlinkApps().Update(app) +// c.Assert(err, IsNil) +// +// // wait until all pods are gone +// for { +// pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). +// List(v1.ListOptions{LabelSelector: "integTest=" + testName}) +// c.Assert(err, IsNil) +// if len(pods.Items) == 0 { +// break +// } +// time.Sleep(100 * time.Millisecond) +// } +// log.Info("All pods torn down") +// log.Info("Completed test TestInPlaceScaleUp") +//} diff --git a/integ/setup.sh b/integ/setup.sh index c13b82bd..5871435b 100755 --- a/integ/setup.sh +++ b/integ/setup.sh @@ -1,14 +1,31 @@ #!/usr/bin/env bash -export DOCKER_IMAGE=flinkk8soperator:$(git rev-parse HEAD) -export OPERATOR_IMAGE=127.0.0.1:32000/flinkk8soperator:local +# Test App Setup + +# TODO: upgrade flink test app from 1.8 +#cd integ/operator-test-app +#export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD) +#docker build -t $TEST_APP_IMAGE . +#docker tag $TEST_APP_IMAGE flink-test-app:local.1 +#docker tag $TEST_APP_IMAGE flink-test-app:local.2 +#minikube image load flink-test-app:local.1 +#minikube image load flink-test-app:local.2 +# +#cd ../../ + +docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1 +docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2 +minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1 +minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2 -microk8s.docker build -t $DOCKER_IMAGE . -microk8s.docker tag $DOCKER_IMAGE $OPERATOR_IMAGE -microk8s.docker push 127.0.0.1:32000/flinkk8soperator -microk8s.start -microk8s.status --wait-ready +# Operator Setup + +export DOCKER_IMAGE=flinkk8soperator:$(git rev-parse HEAD) +export OPERATOR_IMAGE=flinkk8soperator:local + +docker build -t $DOCKER_IMAGE . +docker tag $DOCKER_IMAGE $OPERATOR_IMAGE +minikube image load $OPERATOR_IMAGE -microk8s.kubectl proxy --port 8001 & -microk8s.kubectl config view > ~/.kube/config +kubectl proxy --port 8001 & diff --git a/integ/simple_test.go b/integ/simple_test.go index 636235b5..553ea978 100644 --- a/integ/simple_test.go +++ b/integ/simple_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" - "os" "time" "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" @@ -69,6 +68,8 @@ func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1be // Tests job submission, upgrade, rollback, and deletion func (s *IntegSuite) TestSimple(c *C) { + log.Info("Starting test TestSimple") + const finalizer = "simple.finalizers.test.com" // start a simple app @@ -88,7 +89,7 @@ func (s *IntegSuite) TestSimple(c *C) { pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). List(v1.ListOptions{LabelSelector: "integTest=test_simple"}) c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) + c.Assert(len(pods.Items), Equals, 2) for _, pod := range pods.Items { c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) } @@ -104,7 +105,7 @@ func (s *IntegSuite) TestSimple(c *C) { pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). List(v1.ListOptions{LabelSelector: "integTest=test_simple"}) c.Assert(err, IsNil) - c.Assert(len(pods.Items), Equals, 3) + c.Assert(len(pods.Items), Equals, 2) for _, pod := range pods.Items { c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage) } @@ -261,9 +262,12 @@ func (s *IntegSuite) TestSimple(c *C) { time.Sleep(100 * time.Millisecond) } log.Info("All pods torn down") + log.Info("Completed test TestSimple") } func (s *IntegSuite) TestRecovery(c *C) { + log.Info("Starting test TestRecovery") + config, err := s.Util.ReadFlinkApplication("test_app.yaml") c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) @@ -313,9 +317,8 @@ func (s *IntegSuite) TestRecovery(c *C) { } // cause the app to start failing - f, err := os.OpenFile(s.Util.CheckpointDir+"/fail", os.O_RDONLY|os.O_CREATE, 0666) + err = s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail") c.Assert(err, IsNil) - c.Assert(f.Close(), IsNil) log.Info("Triggered failure") @@ -344,7 +347,7 @@ func (s *IntegSuite) TestRecovery(c *C) { c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) // stop it from failing - c.Assert(os.Remove(s.Util.CheckpointDir+"/fail"), IsNil) + c.Assert(s.Util.ExecuteCommand("minikube", "ssh", "sudo rm /tmp/checkpoints/fail"), IsNil) c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) // delete the application @@ -358,4 +361,5 @@ func (s *IntegSuite) TestRecovery(c *C) { } } log.Info("All pods torn down") + log.Info("Completed test TestRecovery") } diff --git a/integ/test.sh b/integ/test.sh index b845df35..aa25375f 100755 --- a/integ/test.sh +++ b/integ/test.sh @@ -3,11 +3,7 @@ set -e export INTEGRATION=true -export OPERATOR_IMAGE=127.0.0.1:32000/flinkk8soperator:local - -# needed to create the checkpoints directory with world-writable permissions -umask 000 +export OPERATOR_IMAGE=flinkk8soperator:local cd $(dirname "$0") -go test -timeout 40m -check.vv IntegSuite - +go test -p 1 -timeout 40m -check.vv IntegSuite diff --git a/integ/test_app.yaml b/integ/test_app.yaml index 1d56d4e0..03189bf3 100644 --- a/integ/test_app.yaml +++ b/integ/test_app.yaml @@ -7,17 +7,22 @@ metadata: environment: development spec: image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1 + imagePullPolicy: IfNotPresent imagePullSecrets: - name: dockerhub flinkConfig: state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints state.savepoints.dir: file:///checkpoints/flink/savepoints + env.java.opts.jobmanager: "-XX:+UseG1GC" jobManagerConfig: systemMemoryFraction: 0.2 resources: requests: - memory: "200Mi" + memory: "400Mi" + cpu: "0.2" + limits: + memory: "800Mi" cpu: "0.2" replicas: 1 taskManagerConfig: @@ -25,10 +30,11 @@ spec: systemMemoryFraction: 0.5 resources: requests: - memory: "400Mi" + memory: "800Mi" cpu: "0.2" limits: - memory: "400Mi" + memory: "800Mi" + cpu: "0.2" volumeMounts: - mountPath: /checkpoints name: checkpoints @@ -40,5 +46,5 @@ spec: flinkVersion: "1.11" deploymentMode: Dual jarName: "operator-test-app-1.0.0-SNAPSHOT.jar" - parallelism: 3 + parallelism: 2 entryClass: "com.lyft.OperatorTestApp" diff --git a/integ/utils/utils.go b/integ/utils/utils.go index d32a5674..b26635d9 100644 --- a/integ/utils/utils.go +++ b/integ/utils/utils.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "os/exec" "path/filepath" "strings" "time" @@ -19,6 +20,7 @@ import ( "github.com/prometheus/common/log" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + v12 "k8s.io/api/rbac/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsClientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/resource" @@ -99,7 +101,6 @@ func (f *TestUtil) Cleanup() { } } } - err = f.KubeClient.CoreV1().Namespaces().Delete(f.Namespace.Name, &metav1.DeleteOptions{}) if err != nil { log.Errorf("Failed to clean up after test: %v", err) @@ -107,6 +108,20 @@ func (f *TestUtil) Cleanup() { } } +func (f *TestUtil) ExecuteCommand(name string, arg ...string) error { + cmd := exec.Command(name, arg...) + stdout, err := cmd.Output() + + if err != nil { + fmt.Println(err.Error()) + return err + } + + fmt.Println(string(stdout)) + + return nil +} + func getFile(relativePath string) (*os.File, error) { path, err := filepath.Abs(relativePath) if err != nil { @@ -138,10 +153,79 @@ func (f *TestUtil) CreateCRD() error { return nil } +func (f *TestUtil) CreateClusterRole() error { + file, err := getFile("../deploy/role.yaml") + if err != nil { + return err + } + + clusterRole := v12.ClusterRole{} + err = yaml.NewYAMLOrJSONDecoder(file, 1024).Decode(&clusterRole) + if err != nil { + return err + } + + _, err = f.KubeClient.RbacV1().ClusterRoles().Create(&clusterRole) + if err != nil { + return err + } + + return nil +} + +func (f *TestUtil) CreateServiceAccount() error { + file, err := getFile("../deploy/role.yaml") + if err != nil { + return err + } + + serviceAccount := v1.ServiceAccount{} + err = yaml.NewYAMLOrJSONDecoder(file, 1024).Decode(&serviceAccount) + if err != nil { + return err + } + + serviceAccount.Namespace = f.Namespace.Name + + _, err = f.KubeClient.CoreV1().ServiceAccounts(f.Namespace.Name).Create(&serviceAccount) + if err != nil { + return err + } + + return nil +} + +func (f *TestUtil) CreateClusterRoleBinding() error { + file, err := getFile("../deploy/role-binding.yaml") + if err != nil { + return err + } + + clusterRoleBinding := v12.ClusterRoleBinding{} + err = yaml.NewYAMLOrJSONDecoder(file, 1024).Decode(&clusterRoleBinding) + if err != nil { + return err + } + + clusterRoleBinding.Subjects = []v12.Subject{{ + Kind: "ServiceAccount", + Name: "flinkoperator", + Namespace: f.Namespace.Name, + }} + clusterRoleBinding.Namespace = f.Namespace.Name + + _, err = f.KubeClient.RbacV1().ClusterRoleBindings().Create(&clusterRoleBinding) + if err != nil { + return err + } + + return nil +} + func (f *TestUtil) CreateOperator() error { configValue := make(map[string]string) configValue["development"] = "operator:\n containerNameFormat: \"%s-unknown\"\n resyncPeriod: 5s\n" + - " baseBackoffDuration: 50ms\n maxBackoffDuration: 2s\n maxErrDuration: 90s\n" + + " baseBackoffDuration: 50ms\n maxBackoffDuration: 2s\n maxErrDuration: 240s\n" + "logger:\n formatter:\n type: text\n" configMap := v1.ConfigMap{ @@ -179,6 +263,7 @@ func (f *TestUtil) CreateOperator() error { }, }, Spec: v1.PodSpec{ + ServiceAccountName: "flinkoperator", Volumes: []v1.Volume{ { Name: "config-volume", @@ -361,6 +446,7 @@ func (f *TestUtil) GetFlinkApplication(name string) (*flinkapp.FlinkApplication, } func (f *TestUtil) WaitForPhase(name string, phase flinkapp.FlinkApplicationPhase, failurePhases ...flinkapp.FlinkApplicationPhase) error { + waitTime := 0 for { app, err := f.FlinkApps().Get(name, metav1.GetOptions{}) @@ -378,7 +464,12 @@ func (f *TestUtil) WaitForPhase(name string, phase flinkapp.FlinkApplicationPhas } } - time.Sleep(200 * time.Millisecond) + waitTime++ + time.Sleep(1 * time.Second) + + if waitTime > 500 { + return fmt.Errorf("timed out 500s before reaching phase %s", phase.VerboseString()) + } } }