From 688b570594e59c64c8257b38925f079cd824cbe8 Mon Sep 17 00:00:00 2001 From: tsc Date: Wed, 13 Jan 2021 00:34:36 +0100 Subject: [PATCH 1/7] Drop unnecessary object name conversion --- .../src/main/scala/cloudflow/operator/action/Name.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala index c68a6c602..9d6377e1d 100644 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala @@ -125,7 +125,7 @@ object Name { makeDNS1039Compatible(fixDots(streamletDeploymentName)) def ofConfigMap(streamletDeploymentName: String) = - makeDNS1123CompatibleSubDomainName(s"configmap-${fixDots(streamletDeploymentName)}") + makeDNS1123CompatibleSubDomainName(s"configmap-${fixDots(streamletDeploymentName)}") // TODO - why append '-service' (line 145) but prepend 'configmap-'? - suffixes seem the more k8s-idiomatic choice def ofLabelValue(name: String) = truncateTo63Characters(name) @@ -142,7 +142,7 @@ object Name { val ofContainerPrometheusExporterPort = max15Chars("prom-metrics") def ofService(streamletDeploymentName: String) = - truncateTo63CharactersWithSuffix(makeDNS1039Compatible(ofPod(streamletDeploymentName)), "-service") + truncateTo63CharactersWithSuffix(ofPod(streamletDeploymentName), "-service") def ofAdminService(streamletDeploymentName: String) = s"${ofPod(streamletDeploymentName)}-admin-service" From 8dfedf9fb2cf93f678cb8dedd0e463607b73b9c5 Mon Sep 17 00:00:00 2001 From: tsc Date: Wed, 13 Jan 2021 00:58:26 +0100 Subject: [PATCH 2/7] Avoid code duplication for building k8s service-name based on app-id and streamlet-name --- .../cloudflow/operator/action/EndpointActions.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala index c3cafa537..8b1ccbe7b 100644 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala @@ -40,17 +40,25 @@ object EndpointActions { def distinctEndpoints(app: CloudflowApplication.Spec) = app.deployments.flatMap(deployment => deployment.endpoint).toSet + def deploymentsOfEndpoints(app: CloudflowApplication.Spec) = + app.deployments + .flatMap(deployment => deployment.endpoint.map(ep => ep -> deployment.name)) + .toMap // use deployment name rather than re-constructing the name again (by duplicating code) + + val currentDeploymentsOfEndpoints = currentApp.map(current => deploymentsOfEndpoints(current.spec)).getOrElse(Map.empty) + val newDeploymentsOfEndpoints = deploymentsOfEndpoints(newApp.spec) + val currentEndpoints = currentApp.map(cr => distinctEndpoints(cr.spec)).getOrElse(Set.empty[Endpoint]) val newEndpoints = distinctEndpoints(newApp.spec) val deleteActions = (currentEndpoints -- newEndpoints).flatMap { endpoint => Seq( - Action.delete[Service](Name.ofService(StreamletDeployment.name(newApp.spec.appId, endpoint.streamlet)), newApp.namespace) + Action.delete[Service](Name.ofService(currentDeploymentsOfEndpoints(endpoint)), newApp.namespace) ) }.toList val createActions = (newEndpoints -- currentEndpoints).flatMap { endpoint => Seq( - createServiceAction(endpoint, newApp, StreamletDeployment.name(newApp.spec.appId, endpoint.streamlet)) + createServiceAction(endpoint, newApp, newDeploymentsOfEndpoints(endpoint)) ) }.toList deleteActions ++ createActions From 47289fdc52753028145419208e5ec8fedb7c2a3f Mon Sep 17 00:00:00 2001 From: tsc Date: Fri, 21 May 2021 08:35:33 +0200 Subject: [PATCH 3/7] Add specs for reproducing unexpected behavior --- core/build.sbt | 3 +- .../src/test/avro/TestData.avsc | 14 +++ .../akkastream/testkit/ReproduceErrors.scala | 117 ++++++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 core/cloudflow-akka-testkit/src/test/avro/TestData.avsc create mode 100644 core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala diff --git a/core/build.sbt b/core/build.sbt index 8af15b078..9f818c538 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -196,7 +196,8 @@ lazy val akkastreamTestkit = ) .settings( javacOptions += "-Xlint:deprecation", - javacOptions += "-Xlint:unchecked" + javacOptions += "-Xlint:unchecked", + (sourceGenerators in Test) += (avroScalaGenerateSpecific in Test).taskValue ) lazy val akkastreamTests = diff --git a/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc b/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc new file mode 100644 index 000000000..f0d20b850 --- /dev/null +++ b/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc @@ -0,0 +1,14 @@ +{ + "namespace": "cloudflow.akkastream.testdata", + "type": "record", + "name": "TestData", + "fields":[ + { + "name": "id", "type": "int" + }, + { + "name": "name", "type": "string" + } + ] +} + diff --git a/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala b/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala new file mode 100644 index 000000000..9573ac6b0 --- /dev/null +++ b/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloudflow.akkastream.testkit + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ RunnableGraph, Source } +import akka.testkit.TestKit +import cloudflow.akkastream.{ AkkaStreamlet, AkkaStreamletLogic } +import cloudflow.akkastream.scaladsl.RunnableGraphStreamletLogic +import cloudflow.streamlets.StreamletShape +import cloudflow.akkastream.testdata.TestData +import cloudflow.akkastream.testkit.scaladsl.{ AkkaStreamletTestKit, Completed } +import cloudflow.streamlets.avro.{ AvroInlet, AvroOutlet } +import org.scalatest._ + +import scala.concurrent.Future + +class ReproduceErrors extends WordSpec with MustMatchers with BeforeAndAfterAll { + private implicit val system = ActorSystem("CloudflowAkkaTestkitErrorReproducerSpec") + + override def afterAll: Unit = + TestKit.shutdownActorSystem(system) + + object TestFixture { + val msgs = List.tabulate(10)(i => TestData(i, i.toString)) + + class TestStreamlet extends AkkaStreamlet { + val in = AvroInlet[TestData]("in") + val out = AvroOutlet[TestData]("out") + + override val shape: StreamletShape = StreamletShape(in).withOutlets(out) + + override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() { + override def runnableGraph(): RunnableGraph[_] = { + val write = sinkRef(out).write _ + sourceWithCommittableContext(in) + .mapAsync(parallelism = 1) { element => + Future { + write(element) + } + } + .to(committableSink) + } + } + } + } + + import TestFixture._ + + "Cloudlfow Akka TestKit" should { + "emit a dedicated completed messages after each message emitted via sinkRef.write, but should not" in { + val testkit = AkkaStreamletTestKit(system) + val s = new TestStreamlet() + val in = testkit.inletFromSource(s.in, Source(msgs)) + val out = testkit.outletAsTap(s.out) + + testkit.run( + s, + List(in), + List(out), + () => { + val gotAll = out.probe.receiveN(msgs.size * 2) + val grouped = gotAll.groupBy { + case _: Completed => Completed + case _ => TestData + } + + grouped(TestData) must have size msgs.size + + val resultWithoutIndex = grouped(TestData).asInstanceOf[Seq[(_, TestData)]].map(_._2) + resultWithoutIndex must contain allElementsOf msgs + + grouped(Completed) must have size 1 // but is actually of size msgs.size + } + ) + } + + (0 until 300).foreach { i => + s"maintain the order in which messages are emitted via sinkRef.write (run #$i), but should not" in { + val testkit = AkkaStreamletTestKit(system) + val s = new TestStreamlet() + val in = testkit.inletFromSource(s.in, Source(msgs)) + val out = testkit.outletAsTap(s.out) + + testkit.run( + s, + List(in), + List(out), + () => { + val got = out.probe + .receiveN(msgs.size * 2) + .filter(_ != Completed) // compensate for Completed msg being published after each msg as in upper test case + .map { + case (_, m: TestData) => m + } + got mustEqual msgs + } + ) + } + } + } + +} From c9ad68d4f7082f698fe1c02390a2604e06d16df7 Mon Sep 17 00:00:00 2001 From: tsc Date: Fri, 21 May 2021 09:16:03 +0200 Subject: [PATCH 4/7] Revert "Add specs for reproducing unexpected behavior" This reverts commit 47289fdc52753028145419208e5ec8fedb7c2a3f. --- core/build.sbt | 3 +- .../src/test/avro/TestData.avsc | 14 --- .../akkastream/testkit/ReproduceErrors.scala | 117 ------------------ 3 files changed, 1 insertion(+), 133 deletions(-) delete mode 100644 core/cloudflow-akka-testkit/src/test/avro/TestData.avsc delete mode 100644 core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala diff --git a/core/build.sbt b/core/build.sbt index 9f818c538..8af15b078 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -196,8 +196,7 @@ lazy val akkastreamTestkit = ) .settings( javacOptions += "-Xlint:deprecation", - javacOptions += "-Xlint:unchecked", - (sourceGenerators in Test) += (avroScalaGenerateSpecific in Test).taskValue + javacOptions += "-Xlint:unchecked" ) lazy val akkastreamTests = diff --git a/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc b/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc deleted file mode 100644 index f0d20b850..000000000 --- a/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc +++ /dev/null @@ -1,14 +0,0 @@ -{ - "namespace": "cloudflow.akkastream.testdata", - "type": "record", - "name": "TestData", - "fields":[ - { - "name": "id", "type": "int" - }, - { - "name": "name", "type": "string" - } - ] -} - diff --git a/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala b/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala deleted file mode 100644 index 9573ac6b0..000000000 --- a/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.akkastream.testkit - -import akka.actor.ActorSystem -import akka.stream.scaladsl.{ RunnableGraph, Source } -import akka.testkit.TestKit -import cloudflow.akkastream.{ AkkaStreamlet, AkkaStreamletLogic } -import cloudflow.akkastream.scaladsl.RunnableGraphStreamletLogic -import cloudflow.streamlets.StreamletShape -import cloudflow.akkastream.testdata.TestData -import cloudflow.akkastream.testkit.scaladsl.{ AkkaStreamletTestKit, Completed } -import cloudflow.streamlets.avro.{ AvroInlet, AvroOutlet } -import org.scalatest._ - -import scala.concurrent.Future - -class ReproduceErrors extends WordSpec with MustMatchers with BeforeAndAfterAll { - private implicit val system = ActorSystem("CloudflowAkkaTestkitErrorReproducerSpec") - - override def afterAll: Unit = - TestKit.shutdownActorSystem(system) - - object TestFixture { - val msgs = List.tabulate(10)(i => TestData(i, i.toString)) - - class TestStreamlet extends AkkaStreamlet { - val in = AvroInlet[TestData]("in") - val out = AvroOutlet[TestData]("out") - - override val shape: StreamletShape = StreamletShape(in).withOutlets(out) - - override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() { - override def runnableGraph(): RunnableGraph[_] = { - val write = sinkRef(out).write _ - sourceWithCommittableContext(in) - .mapAsync(parallelism = 1) { element => - Future { - write(element) - } - } - .to(committableSink) - } - } - } - } - - import TestFixture._ - - "Cloudlfow Akka TestKit" should { - "emit a dedicated completed messages after each message emitted via sinkRef.write, but should not" in { - val testkit = AkkaStreamletTestKit(system) - val s = new TestStreamlet() - val in = testkit.inletFromSource(s.in, Source(msgs)) - val out = testkit.outletAsTap(s.out) - - testkit.run( - s, - List(in), - List(out), - () => { - val gotAll = out.probe.receiveN(msgs.size * 2) - val grouped = gotAll.groupBy { - case _: Completed => Completed - case _ => TestData - } - - grouped(TestData) must have size msgs.size - - val resultWithoutIndex = grouped(TestData).asInstanceOf[Seq[(_, TestData)]].map(_._2) - resultWithoutIndex must contain allElementsOf msgs - - grouped(Completed) must have size 1 // but is actually of size msgs.size - } - ) - } - - (0 until 300).foreach { i => - s"maintain the order in which messages are emitted via sinkRef.write (run #$i), but should not" in { - val testkit = AkkaStreamletTestKit(system) - val s = new TestStreamlet() - val in = testkit.inletFromSource(s.in, Source(msgs)) - val out = testkit.outletAsTap(s.out) - - testkit.run( - s, - List(in), - List(out), - () => { - val got = out.probe - .receiveN(msgs.size * 2) - .filter(_ != Completed) // compensate for Completed msg being published after each msg as in upper test case - .map { - case (_, m: TestData) => m - } - got mustEqual msgs - } - ) - } - } - } - -} From fe1e8cffd2636d5b257d6652fd75cc1b611c4b60 Mon Sep 17 00:00:00 2001 From: tsc Date: Tue, 9 Aug 2022 17:10:40 +0200 Subject: [PATCH 5/7] Add paragraph on explicitly adding cloudflow-avro and -protobuf dependencies --- .../modules/get-started/pages/define-avro-schema.adoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc b/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc index 2150e193c..c9587c326 100644 --- a/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc +++ b/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc @@ -6,6 +6,16 @@ include::ROOT:partial$include.adoc[] Let's start building the avro schema for the domain objects that we need for the application. These schema files have the extension `.avsc` and go directly under `src/main/avro` in the project structure that we discussed earlier. +When using Avro for serialization of domain objects (and reading/writing them from/to Kafka), it is necessary to add a dependency to the `cloudflow-avro` library: +- add libraryDependendy `Cloudflow.library.CloudflowAvro` (in SBT) +- or explicitly add the dependency to `"com.lightbend.cloudflow" %% "cloudflow-avro" % ` (in any build tool) + +It is also possible to use Protobuf instead of Avro. +In this instance, the `cloudflow-protobuf` dependency needs to be added to the project: +- add libraryDependendy `Cloudflow.library.CloudflowProto` (in SBT) +- or explicitly add the dependency to `"com.lightbend.cloudflow" %% "cloudflow-proto" % ` (in any build tool) + + In the Wind Turbine example, we will use the following domain objects: * **SensorData:** The data that we receive from the source and ingest through our ingress. From a59a773b0769535cb54b0256abcbd3e352e05966 Mon Sep 17 00:00:00 2001 From: tsc Date: Tue, 9 Aug 2022 17:17:39 +0200 Subject: [PATCH 6/7] Add hint to use sbt-avrohugger to docs --- .../get-started/pages/define-avro-schema.adoc | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc b/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc index c9587c326..e967055a4 100644 --- a/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc +++ b/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc @@ -7,14 +7,18 @@ Let's start building the avro schema for the domain objects that we need for the These schema files have the extension `.avsc` and go directly under `src/main/avro` in the project structure that we discussed earlier. When using Avro for serialization of domain objects (and reading/writing them from/to Kafka), it is necessary to add a dependency to the `cloudflow-avro` library: -- add libraryDependendy `Cloudflow.library.CloudflowAvro` (in SBT) -- or explicitly add the dependency to `"com.lightbend.cloudflow" %% "cloudflow-avro" % ` (in any build tool) - +- add libraryDependency defined in Cloudflow sbt-plugin variable `Cloudflow.library.CloudflowAvro` +- or explicitly add libraryDependency `"com.lightbend.cloudflow" %% "cloudflow-avro" % ` +In order to get Scala case classes generated from Avro Schemas, add the `sbt-avrohugger` plugin to your project's `plugin.sbt` file: +``` +addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0") +``` + +Note: It is also possible to use Protobuf instead of Avro. In this instance, the `cloudflow-protobuf` dependency needs to be added to the project: -- add libraryDependendy `Cloudflow.library.CloudflowProto` (in SBT) -- or explicitly add the dependency to `"com.lightbend.cloudflow" %% "cloudflow-proto" % ` (in any build tool) - +- add libraryDependency defined in Cloudflow sbt-plugin variable `Cloudflow.library.CloudflowProto` +- or explicitly add libraryDependency `"com.lightbend.cloudflow" %% "cloudflow-proto" % ` In the Wind Turbine example, we will use the following domain objects: From f797ef2174ea41ceb50642a3d312cf7f2fd0c3d6 Mon Sep 17 00:00:00 2001 From: tsc Date: Tue, 9 Aug 2022 17:25:16 +0200 Subject: [PATCH 7/7] Polish doc text --- .../modules/get-started/pages/define-avro-schema.adoc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc b/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc index e967055a4..41d05d4e4 100644 --- a/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc +++ b/docs/docs-source/docs/modules/get-started/pages/define-avro-schema.adoc @@ -6,8 +6,8 @@ include::ROOT:partial$include.adoc[] Let's start building the avro schema for the domain objects that we need for the application. These schema files have the extension `.avsc` and go directly under `src/main/avro` in the project structure that we discussed earlier. -When using Avro for serialization of domain objects (and reading/writing them from/to Kafka), it is necessary to add a dependency to the `cloudflow-avro` library: -- add libraryDependency defined in Cloudflow sbt-plugin variable `Cloudflow.library.CloudflowAvro` +When using Avro for serialization of domain objects (and reading/writing them from/to Kafka), it is necessary to add a dependency to the `cloudflow-avro` library to your sbt project: +- either use the dependency variable defined in the Cloudflow sbt-plugin: `Cloudflow.library.CloudflowAvro` - or explicitly add libraryDependency `"com.lightbend.cloudflow" %% "cloudflow-avro" % ` In order to get Scala case classes generated from Avro Schemas, add the `sbt-avrohugger` plugin to your project's `plugin.sbt` file: ``` @@ -16,8 +16,8 @@ addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0") Note: It is also possible to use Protobuf instead of Avro. -In this instance, the `cloudflow-protobuf` dependency needs to be added to the project: -- add libraryDependency defined in Cloudflow sbt-plugin variable `Cloudflow.library.CloudflowProto` +In this scenario, the `cloudflow-proto` dependency needs to be added to your sbt project: +- either use the dependency variable defined in the Cloudflow sbt-plugin: `Cloudflow.library.CloudflowProto` - or explicitly add libraryDependency `"com.lightbend.cloudflow" %% "cloudflow-proto" % ` In the Wind Turbine example, we will use the following domain objects: