From 698f99533bf4f08b1071a4c00583ed3163874a0b Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Fri, 27 Nov 2020 18:04:47 +0000 Subject: [PATCH] Enable external kafka for runLocal (#904) * Enable the usage of an external Kafka to runLocal * minor * minor * rewording --- .../cloudflow/localrunner/LocalRunner.scala | 16 +++++++------- .../sbt/CloudflowApplicationPlugin.scala | 3 ++- .../scala/cloudflow/sbt/CloudflowKeys.scala | 1 + .../sbt/CloudflowLocalRunnerPlugin.scala | 22 ++++++++++++------- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala index 58f7b4876..ea3fb6683 100644 --- a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala +++ b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala @@ -63,13 +63,13 @@ object LocalRunner extends StreamletLoader { * * @param args: args(0) must be the JSON-encoded Application Descriptor * args(1) must be the file to use for the output - * args(2) must be the port where kafka is running on + * args(2) must be the kafka instance to use */ def main(args: Array[String]): Unit = { - val usage = "Usage: localRunner [localConfigFile]" - val (appDescriptorFilename, outputFilename, kafkaPort, localConfig) = args.toList match { - case app :: out :: kafkaPort :: conf :: Nil => (app, out, kafkaPort, ConfigFactory.parseFile(new File(conf)).resolve) - case app :: out :: kafkaPort :: Nil => (app, out, kafkaPort, ConfigFactory.empty()) + val usage = "Usage: localRunner [localConfigFile]" + val (appDescriptorFilename, outputFilename, kafkaHost, localConfig) = args.toList match { + case app :: out :: kafkaHost :: conf :: Nil => (app, out, kafkaHost, ConfigFactory.parseFile(new File(conf)).resolve) + case app :: out :: kafkaHost :: Nil => (app, out, kafkaHost, ConfigFactory.empty()) case Nil => throw new RuntimeException(s"Missing application configuration file and output file for Local Runner\n$usage") case _ :: Nil => throw new RuntimeException(s"Missing output file for Local Runner\n$usage") case _ :: _ :: Nil => throw new RuntimeException(s"Missing kafka port\n$usage") @@ -88,7 +88,7 @@ object LocalRunner extends StreamletLoader { System.setErr(new PrintStream(fos)) readDescriptorFile(appDescriptorFilename) match { case Success(applicationDescriptor) => - run(applicationDescriptor, localConfig, kafkaPort) + run(applicationDescriptor, localConfig, kafkaHost) case Failure(ex) => log.error(s"Failed JSON unmarshalling of application descriptor file [${appDescriptorFilename}].", ex) System.exit(1) @@ -97,9 +97,9 @@ object LocalRunner extends StreamletLoader { } } - private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaPort: String): Unit = { + private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaHost: String): Unit = { val bootstrapServers = - if (localConfig.hasPath(BootstrapServersKey)) localConfig.getString(BootstrapServersKey) else s"localhost:$kafkaPort" + if (localConfig.hasPath(BootstrapServersKey)) localConfig.getString(BootstrapServersKey) else kafkaHost val topicConfig = ConfigFactory.parseString(s"""bootstrap.servers = "$bootstrapServers"""") val appId = appDescriptor.appId diff --git a/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowApplicationPlugin.scala b/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowApplicationPlugin.scala index 3b879cd9e..7fccabf00 100644 --- a/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowApplicationPlugin.scala +++ b/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowApplicationPlugin.scala @@ -32,7 +32,8 @@ object CloudflowApplicationPlugin extends AutoPlugin { override def buildSettings = Seq( cloudflowDockerRegistry := None, - cloudflowDockerRepository := None + cloudflowDockerRepository := None, + runLocalKafka := None ) val DefaultLocalLog4jConfigFile = "local-run-log4j.properties" diff --git a/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowKeys.scala b/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowKeys.scala index 6664deaa2..6b4f17f24 100644 --- a/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowKeys.scala +++ b/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowKeys.scala @@ -48,6 +48,7 @@ trait CloudflowSettingKeys { val blueprint = settingKey[Option[String]]("The path to the blueprint file to use in this Cloudflow application.") val schemaCodeGenerator = settingKey[SchemaCodeGenerator.Language]("The language to generate data model schemas into.") val schemaPaths = settingKey[Map[SchemaFormat.Format, String]]("A Map of paths to your data model schemas.") + val runLocalKafka = settingKey[Option[String]]("the external Kafka to use with the local runner Sandbox.") val runLocalConfigFile = settingKey[Option[String]]("the HOCON configuration file to use with the local runner Sandbox.") val runLocalLog4jConfigFile = settingKey[Option[String]]( s"The path to the log4j configuration file to use with the local runner Sandbox, if omitted, ${CloudflowApplicationPlugin.DefaultLocalLog4jConfigFile} is read from plugin classpath." diff --git a/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowLocalRunnerPlugin.scala b/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowLocalRunnerPlugin.scala index 552919a6f..a9ba10de0 100644 --- a/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowLocalRunnerPlugin.scala +++ b/core/sbt-cloudflow/src/main/scala/cloudflow/sbt/CloudflowLocalRunnerPlugin.scala @@ -132,7 +132,11 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { } .distinct .sorted - val kafkaPort = setupKafka(topics) + val kafkaHost = { + val host = (ThisBuild / runLocalKafka).value.getOrElse(setupKafka()) + createTopics(host, topics) + host + } printAppLayout(resolveConnections(appDescriptor)) printInfo(runtimeDescriptorByProject, tempDir.toFile, topics, localConfig.message) @@ -141,7 +145,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { case (pid, rd) => val classpath = cpByProject(pid) val loggingPatchedClasspath = prepareLoggingInClasspath(classpath, logDependencies) - runPipelineJVM(rd.appDescriptorFile, loggingPatchedClasspath, rd.outputFile, rd.logConfig, rd.localConfPath, kafkaPort) + runPipelineJVM(rd.appDescriptorFile, loggingPatchedClasspath, rd.outputFile, rd.logConfig, rd.localConfPath, kafkaHost) } println(s"Running ${appDescriptor.appId} \nTo terminate, press [ENTER]\n") @@ -200,7 +204,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { val kafka = new AtomicReference[KafkaContainer]() - def setupKafka(topics: Seq[String])(implicit log: Logger) = { + def setupKafka()(implicit log: Logger) = { val cl = Thread.currentThread().getContextClassLoader() val kafkaPort = @@ -223,6 +227,10 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { log.debug(s"Setting up Kafka broker in Docker on port: $kafkaPort") + s"localhost:${kafkaPort}" + } + + def createTopics(kafkaHost: String, topics: Seq[String])(implicit log: Logger) = { import org.apache.kafka.clients.admin.{ AdminClient, AdminClientConfig, NewTopic } import scala.collection.JavaConverters._ @@ -231,7 +239,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { while (retry > 0) { val adminClient = AdminClient.create( Map[String, Object]( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost.:${kafkaPort}", + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaHost, AdminClientConfig.CLIENT_ID_CONFIG -> UUID.randomUUID().toString ).asJava ) @@ -255,8 +263,6 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { adminClient.close() } } - - kafkaPort } def stopKafka() = Try { @@ -418,7 +424,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { outputFile: File, log4JConfigFile: Path, localConfPath: Option[String], - kafkaPort: Int)( + kafkaHost: String)( implicit logger: Logger ): Process = { val cp = "-cp" @@ -438,7 +444,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin { val options: Seq[String] = Seq( Some(applicationDescriptorFile.toFile.getAbsolutePath), Some(outputFile.getAbsolutePath), - Some(kafkaPort.toString), + Some(kafkaHost), localConfPath ).flatten