Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
no comment (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
franciscolopezsancho authored Mar 23, 2021
1 parent 9239c71 commit 73862f1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ object ApplicationDescriptor {
}

ApplicationDescriptor(sanitizedApplicationId, appVersion, namedStreamletDescriptors.map {
case (_, instance) => instance
case (_, instance) =>
StreamletInstance(instance.name, sanitizeDescriptor(instance.descriptor))
}, deployments, agentPaths, Version, libraryVersion)
}

Expand All @@ -77,6 +78,17 @@ object ApplicationDescriptor {
}.toMap
private def streamletToNamedStreamletDescriptor(streamlet: VerifiedStreamlet) =
(streamlet, StreamletInstance(streamlet.name, streamlet.descriptor))

/**
* Deletes every schema
* StreamletDescriptor.[inlets | outlets].SchemaDescriptor.schema
* to avoid adding the description of each type of the schema in the CR
*/
private def sanitizeDescriptor(descriptor: StreamletDescriptor): StreamletDescriptor = {
val sanitizedInlets = descriptor.inlets.map(each => each.copy(schema = each.schema.copy(schema = "")))
val sanitizedOutlets = descriptor.outlets.map(each => each.copy(schema = each.schema.copy(schema = "")))
descriptor.copy(inlets = sanitizedInlets, outlets = sanitizedOutlets)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,44 @@ class ApplicationDescriptorSpec
processor2Deployment.portMappings.map { case (port, sp) => port -> sp.id } must contain("out" -> "foos2")
}

"from a verified blueprint clean the schema from the CR" in {
Given("a verified blueprint")
val ingress = randomStreamlet().asIngress[Foo].withServerAttribute
val processor = randomStreamlet().asProcessor[Foo, Bar].withRuntime("spark")
val egress = randomStreamlet().asEgress[Bar]

val ingressRef = ingress.ref("ingress")
val processorRef = processor.ref("processor")
val egressRef = egress.ref("egress")

val verifiedBlueprint = Blueprint()
.define(Vector(ingress, processor, egress))
.use(ingressRef)
.use(processorRef)
.use(egressRef)
.connect(BTopic("foos"), ingressRef.out, processorRef.in)
.connect(BTopic("bars1"), processorRef.out, egressRef.in)
.verified
.right
.value

When("I create a deployment descriptor from that blueprint")
val appId = "noisy-nissan-42"
val appVersion = "1-2345678"
val image = "image-1"
val descriptor = ApplicationDescriptor(appId, appVersion, image, verifiedBlueprint, agentPaths, BuildInfo.version)

Then("the descriptor must be valid")
descriptor.deployments.size mustBe 3
Then("the description of the streamlets should not contain the SchemaDescriptor.schema populated")
val schemas = for {
streamlet <- descriptor.streamlets
port <- streamlet.descriptor.inlets ++ streamlet.descriptor.outlets
} yield port.schema.schema
schemas mustBe Vector("", "", "", "")

}

"be built correctly from a verified blueprint (with dual-inlet merging)" in {
Given("a verified blueprint")
val ingress1 = randomStreamlet().asIngress[Foo].withServerAttribute
Expand Down

0 comments on commit 73862f1

Please sign in to comment.