Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Enable external kafka for runLocal (#904)
Browse files Browse the repository at this point in the history
* Enable the usage of an external Kafka to runLocal

* minor

* minor

* rewording
  • Loading branch information
andreaTP authored Nov 27, 2020
1 parent b469fa8 commit 698f995
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ object LocalRunner extends StreamletLoader {
*
* @param args: args(0) must be the JSON-encoded Application Descriptor
* args(1) must be the file to use for the output
* args(2) must be the port where kafka is running on
* args(2) must be the kafka instance to use
*/
def main(args: Array[String]): Unit = {
val usage = "Usage: localRunner <applicationFileJson> <outputFile> <kafka-port> [localConfigFile]"
val (appDescriptorFilename, outputFilename, kafkaPort, localConfig) = args.toList match {
case app :: out :: kafkaPort :: conf :: Nil => (app, out, kafkaPort, ConfigFactory.parseFile(new File(conf)).resolve)
case app :: out :: kafkaPort :: Nil => (app, out, kafkaPort, ConfigFactory.empty())
val usage = "Usage: localRunner <applicationFileJson> <outputFile> <kafka-host> [localConfigFile]"
val (appDescriptorFilename, outputFilename, kafkaHost, localConfig) = args.toList match {
case app :: out :: kafkaHost :: conf :: Nil => (app, out, kafkaHost, ConfigFactory.parseFile(new File(conf)).resolve)
case app :: out :: kafkaHost :: Nil => (app, out, kafkaHost, ConfigFactory.empty())
case Nil => throw new RuntimeException(s"Missing application configuration file and output file for Local Runner\n$usage")
case _ :: Nil => throw new RuntimeException(s"Missing output file for Local Runner\n$usage")
case _ :: _ :: Nil => throw new RuntimeException(s"Missing kafka port\n$usage")
Expand All @@ -88,7 +88,7 @@ object LocalRunner extends StreamletLoader {
System.setErr(new PrintStream(fos))
readDescriptorFile(appDescriptorFilename) match {
case Success(applicationDescriptor) =>
run(applicationDescriptor, localConfig, kafkaPort)
run(applicationDescriptor, localConfig, kafkaHost)
case Failure(ex) =>
log.error(s"Failed JSON unmarshalling of application descriptor file [${appDescriptorFilename}].", ex)
System.exit(1)
Expand All @@ -97,9 +97,9 @@ object LocalRunner extends StreamletLoader {
}
}

private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaPort: String): Unit = {
private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaHost: String): Unit = {
val bootstrapServers =
if (localConfig.hasPath(BootstrapServersKey)) localConfig.getString(BootstrapServersKey) else s"localhost:$kafkaPort"
if (localConfig.hasPath(BootstrapServersKey)) localConfig.getString(BootstrapServersKey) else kafkaHost
val topicConfig = ConfigFactory.parseString(s"""bootstrap.servers = "$bootstrapServers"""")

val appId = appDescriptor.appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ object CloudflowApplicationPlugin extends AutoPlugin {

override def buildSettings = Seq(
cloudflowDockerRegistry := None,
cloudflowDockerRepository := None
cloudflowDockerRepository := None,
runLocalKafka := None
)

val DefaultLocalLog4jConfigFile = "local-run-log4j.properties"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ trait CloudflowSettingKeys {
val blueprint = settingKey[Option[String]]("The path to the blueprint file to use in this Cloudflow application.")
val schemaCodeGenerator = settingKey[SchemaCodeGenerator.Language]("The language to generate data model schemas into.")
val schemaPaths = settingKey[Map[SchemaFormat.Format, String]]("A Map of paths to your data model schemas.")
val runLocalKafka = settingKey[Option[String]]("the external Kafka to use with the local runner Sandbox.")
val runLocalConfigFile = settingKey[Option[String]]("the HOCON configuration file to use with the local runner Sandbox.")
val runLocalLog4jConfigFile = settingKey[Option[String]](
s"The path to the log4j configuration file to use with the local runner Sandbox, if omitted, ${CloudflowApplicationPlugin.DefaultLocalLog4jConfigFile} is read from plugin classpath."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {
}
.distinct
.sorted
val kafkaPort = setupKafka(topics)
val kafkaHost = {
val host = (ThisBuild / runLocalKafka).value.getOrElse(setupKafka())
createTopics(host, topics)
host
}

printAppLayout(resolveConnections(appDescriptor))
printInfo(runtimeDescriptorByProject, tempDir.toFile, topics, localConfig.message)
Expand All @@ -141,7 +145,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {
case (pid, rd) =>
val classpath = cpByProject(pid)
val loggingPatchedClasspath = prepareLoggingInClasspath(classpath, logDependencies)
runPipelineJVM(rd.appDescriptorFile, loggingPatchedClasspath, rd.outputFile, rd.logConfig, rd.localConfPath, kafkaPort)
runPipelineJVM(rd.appDescriptorFile, loggingPatchedClasspath, rd.outputFile, rd.logConfig, rd.localConfPath, kafkaHost)
}

println(s"Running ${appDescriptor.appId} \nTo terminate, press [ENTER]\n")
Expand Down Expand Up @@ -200,7 +204,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {

val kafka = new AtomicReference[KafkaContainer]()

def setupKafka(topics: Seq[String])(implicit log: Logger) = {
def setupKafka()(implicit log: Logger) = {

val cl = Thread.currentThread().getContextClassLoader()
val kafkaPort =
Expand All @@ -223,6 +227,10 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {

log.debug(s"Setting up Kafka broker in Docker on port: $kafkaPort")

s"localhost:${kafkaPort}"
}

def createTopics(kafkaHost: String, topics: Seq[String])(implicit log: Logger) = {
import org.apache.kafka.clients.admin.{ AdminClient, AdminClientConfig, NewTopic }
import scala.collection.JavaConverters._

Expand All @@ -231,7 +239,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {
while (retry > 0) {
val adminClient = AdminClient.create(
Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost.:${kafkaPort}",
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaHost,
AdminClientConfig.CLIENT_ID_CONFIG -> UUID.randomUUID().toString
).asJava
)
Expand All @@ -255,8 +263,6 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {
adminClient.close()
}
}

kafkaPort
}

def stopKafka() = Try {
Expand Down Expand Up @@ -418,7 +424,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {
outputFile: File,
log4JConfigFile: Path,
localConfPath: Option[String],
kafkaPort: Int)(
kafkaHost: String)(
implicit logger: Logger
): Process = {
val cp = "-cp"
Expand All @@ -438,7 +444,7 @@ object CloudflowLocalRunnerPlugin extends AutoPlugin {
val options: Seq[String] = Seq(
Some(applicationDescriptorFile.toFile.getAbsolutePath),
Some(outputFile.getAbsolutePath),
Some(kafkaPort.toString),
Some(kafkaHost),
localConfPath
).flatten

Expand Down

0 comments on commit 698f995

Please sign in to comment.