diff --git a/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/ApplicationDescriptor.scala b/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/ApplicationDescriptor.scala index 95d8399ed..dda79e825 100644 --- a/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/ApplicationDescriptor.scala +++ b/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/ApplicationDescriptor.scala @@ -65,7 +65,8 @@ object ApplicationDescriptor { } ApplicationDescriptor(sanitizedApplicationId, appVersion, namedStreamletDescriptors.map { - case (_, instance) => instance + case (_, instance) => + StreamletInstance(instance.name, sanitizeDescriptor(instance.descriptor)) }, deployments, agentPaths, Version, libraryVersion) } @@ -77,6 +78,17 @@ object ApplicationDescriptor { }.toMap private def streamletToNamedStreamletDescriptor(streamlet: VerifiedStreamlet) = (streamlet, StreamletInstance(streamlet.name, streamlet.descriptor)) + + /** + * Deletes every schema + * StreamletDescriptor.[inlets | outlets].SchemaDescriptor.schema + * to avoid adding the description of each type of the schema in the CR + */ + private def sanitizeDescriptor(descriptor: StreamletDescriptor): StreamletDescriptor = { + val sanitizedInlets = descriptor.inlets.map(each => each.copy(schema = each.schema.copy(schema = ""))) + val sanitizedOutlets = descriptor.outlets.map(each => each.copy(schema = each.schema.copy(schema = ""))) + descriptor.copy(inlets = sanitizedInlets, outlets = sanitizedOutlets) + } } /** diff --git a/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/ApplicationDescriptorSpec.scala b/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/ApplicationDescriptorSpec.scala index 1da242c52..004ada434 100644 --- a/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/ApplicationDescriptorSpec.scala +++ b/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/ApplicationDescriptorSpec.scala @@ -214,6 +214,44 @@ class ApplicationDescriptorSpec processor2Deployment.portMappings.map { case (port, sp) => port -> sp.id } must contain("out" -> "foos2") } + "from a verified blueprint clean the schema from the CR" in { + Given("a verified blueprint") + val ingress = randomStreamlet().asIngress[Foo].withServerAttribute + val processor = randomStreamlet().asProcessor[Foo, Bar].withRuntime("spark") + val egress = randomStreamlet().asEgress[Bar] + + val ingressRef = ingress.ref("ingress") + val processorRef = processor.ref("processor") + val egressRef = egress.ref("egress") + + val verifiedBlueprint = Blueprint() + .define(Vector(ingress, processor, egress)) + .use(ingressRef) + .use(processorRef) + .use(egressRef) + .connect(BTopic("foos"), ingressRef.out, processorRef.in) + .connect(BTopic("bars1"), processorRef.out, egressRef.in) + .verified + .right + .value + + When("I create a deployment descriptor from that blueprint") + val appId = "noisy-nissan-42" + val appVersion = "1-2345678" + val image = "image-1" + val descriptor = ApplicationDescriptor(appId, appVersion, image, verifiedBlueprint, agentPaths, BuildInfo.version) + + Then("the descriptor must be valid") + descriptor.deployments.size mustBe 3 + Then("the description of the streamlets should not contain the SchemaDescriptor.schema populated") + val schemas = for { + streamlet <- descriptor.streamlets + port <- streamlet.descriptor.inlets ++ streamlet.descriptor.outlets + } yield port.schema.schema + schemas mustBe Vector("", "", "", "") + + } + "be built correctly from a verified blueprint (with dual-inlet merging)" in { Given("a verified blueprint") val ingress1 = randomStreamlet().asIngress[Foo].withServerAttribute