diff --git a/pom.xml b/pom.xml index 3666748..4451dbf 100644 --- a/pom.xml +++ b/pom.xml @@ -23,19 +23,56 @@ + + + io.arrow-kt + arrow-core + 0.10.4 + + com.amazonaws - aws-java-sdk-s3 - 1.11.509 + aws-java-sdk + 1.11.714 + + + + + com.jayway.jsonpath + json-path + 2.4.0 + + + + javax.jms + javax.jms-api + 2.0.1 + + + + + com.amazonaws + amazon-sqs-java-messaging-lib + 1.0.8 + + org.springframework + spring-jms + + org.springframework.boot spring-boot-starter-webflux + + org.springframework.boot + spring-boot-starter-rsocket + + org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/kotlin/it/valeriovaudi/i18nmessage/I18nMessageApplication.kt b/src/main/kotlin/it/valeriovaudi/i18nmessage/I18nMessageApplication.kt index c0f312f..734c066 100644 --- a/src/main/kotlin/it/valeriovaudi/i18nmessage/I18nMessageApplication.kt +++ b/src/main/kotlin/it/valeriovaudi/i18nmessage/I18nMessageApplication.kt @@ -1,35 +1,101 @@ package it.valeriovaudi.i18nmessage +import com.amazon.sqs.javamessaging.ProviderConfiguration +import com.amazon.sqs.javamessaging.SQSConnectionFactory +import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.AWSStaticCredentialsProvider import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.services.s3.AmazonS3ClientBuilder +import com.amazonaws.services.sqs.AmazonSQSClientBuilder +import io.rsocket.transport.netty.client.TcpClientTransport import it.valeriovaudi.i18nmessage.messages.AwsS3MessageRepository import it.valeriovaudi.i18nmessage.messages.MessageRepository import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.ConstructorBinding +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.runApplication import org.springframework.context.annotation.Bean +import org.springframework.jms.annotation.EnableJms +import org.springframework.jms.config.DefaultJmsListenerContainerFactory +import org.springframework.jms.support.destination.DynamicDestinationResolver +import org.springframework.messaging.rsocket.RSocketRequester +import org.springframework.messaging.rsocket.RSocketStrategies +import reactor.core.publisher.Mono +import java.net.InetSocketAddress +import javax.jms.ConnectionFactory +import javax.jms.Session + +@EnableJms @SpringBootApplication +@EnableConfigurationProperties(value = [RSocketApplicationClientApps::class]) class I18nMessageApplication { @Bean - fun messageRepository(@Value("\${aws.s3.access-key}") accessKey: String, - @Value("\${aws.s3.secret-key}") awsSecretKey: String, - @Value("\${aws.s3.region}") awsRegion: String, - @Value("\${aws.s3.bucket}") awsBucket: String): MessageRepository { - val credentials = BasicAWSCredentials(accessKey, awsSecretKey) + fun sqsConnectionFactory(@Value("\${aws.s3.region}") awsRegion: String, + awsCredentialsProvider: AWSCredentialsProvider): SQSConnectionFactory { + return SQSConnectionFactory( + ProviderConfiguration(), + AmazonSQSClientBuilder + .standard() + .withCredentials(awsCredentialsProvider) + .withRegion(awsRegion) + .build() + ) + + } + + @Bean + fun jmsListenerContainerFactory(sqsConnectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory { + val factory = DefaultJmsListenerContainerFactory() + factory.setConnectionFactory(sqsConnectionFactory) + factory.setDestinationResolver(DynamicDestinationResolver()) + factory.setConcurrency("3-10") + factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE) + return factory + } + + @Bean + fun requesters(rSocketStrategies: RSocketStrategies, + rSocketApplicationClientApps: RSocketApplicationClientApps, + builder: RSocketRequester.Builder): Map> = + + rSocketApplicationClientApps.clients.map { + val address = InetSocketAddress(it.host, it.port) + val clientTransport: TcpClientTransport = TcpClientTransport.create(address) + mapOf(it.id to builder.rsocketStrategies(rSocketStrategies) + .connect(clientTransport)) + }.reduce { acc, map -> acc + map } + + @Bean + fun messageRepository(@Value("\${aws.s3.region}") awsRegion: String, + @Value("\${aws.s3.bucket}") awsBucket: String, + awsCredentialsProvider: AWSCredentialsProvider): MessageRepository { val s3client = AmazonS3ClientBuilder .standard() - .withCredentials(AWSStaticCredentialsProvider(credentials)) + .withCredentials(awsCredentialsProvider) .withRegion(awsRegion) .build() return AwsS3MessageRepository(s3client, awsBucket) } + + @Bean + fun awsCredentialsProvider(@Value("\${aws.s3.access-key}") accessKey: String, + @Value("\${aws.s3.secret-key}") awsSecretKey: String): + AWSCredentialsProvider = AWSStaticCredentialsProvider(BasicAWSCredentials(accessKey, awsSecretKey)) + } +@ConstructorBinding +@ConfigurationProperties(prefix = "rsocket") +data class RSocketApplicationClientApps(val clients: List) + +data class RSocketApplicationClientApp(val id: String, val host: String, val port: Int) + fun main(args: Array) { runApplication(*args) } diff --git a/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsS3MessageRepository.kt b/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsS3MessageRepository.kt index 801c4f6..266a486 100644 --- a/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsS3MessageRepository.kt +++ b/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsS3MessageRepository.kt @@ -20,11 +20,9 @@ open class AwsS3MessageRepository(private val s3client: AmazonS3, private fun s3MessageBundleFor(message: ObjectListing, application: String, language: String) = Mono.defer { - messageKeyFor(application) - .let { messagesKey -> - Optional.ofNullable(message.objectSummaries.find(resourceBundleFinderPredicate(messagesKey, language))) - .orElse(message.objectSummaries.find(defaultResourceBundleFinderPredicate(messagesKey))) - }.toMono() + Optional.ofNullable(message.objectSummaries.find(resourceBundleFinderPredicate(application, language))) + .orElse(message.objectSummaries.find(defaultResourceBundleFinderPredicate(application))) + .toMono() } private fun defaultResourceBundleFinderPredicate(messagesKey: String): S3ObjectSummaryPredicate = @@ -45,8 +43,6 @@ open class AwsS3MessageRepository(private val s3client: AmazonS3, .map { it.toMap() as Map } - private fun messageKeyFor(application: String) = "i18n-messages/$application" - - private fun getAllS3MessagesBundleFor(application: String) = Mono.fromCallable { s3client.listObjects(bucketName, messageKeyFor(application)) } + private fun getAllS3MessagesBundleFor(application: String) = Mono.fromCallable { s3client.listObjects(bucketName, "$application") } } \ No newline at end of file diff --git a/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsSQSListener.kt b/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsSQSListener.kt new file mode 100644 index 0000000..0b7945d --- /dev/null +++ b/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/AwsSQSListener.kt @@ -0,0 +1,44 @@ +package it.valeriovaudi.i18nmessage.messages + +import com.jayway.jsonpath.JsonPath +import org.springframework.jms.annotation.JmsListener +import org.springframework.messaging.rsocket.RSocketRequester +import org.springframework.stereotype.Component +import reactor.core.publisher.Mono +import java.util.* + +@Component +class AwsSQSListener(private val messageRepository: MessageRepository, + private val requesters: Map>) { + + @JmsListener(destination = "i18n-messages-updates") + fun onMessage(message: String) { + applicationNameFor2(message) + .map { applicationName -> + + println("application $applicationName bundle are refreshing") + + messageRepository.find(applicationName, Locale.ENGLISH) + .flatMap { bundle -> + sendBundleToClient(bundle, applicationName) + }.subscribe() + } + } + + private fun applicationNameFor2(message: String) = + Optional.ofNullable(JsonPath.read(message, "$.detail.requestParameters.key") as String?) + .map { key -> key.split("/").first() } + + + private fun sendBundleToClient(bundle: Messages, applicationName: String): Mono? { + return Optional.ofNullable(bundle) + .map { requesters[applicationName] } + .map { applicationRequester -> + applicationRequester?.flatMap { req -> + req.route("messages.$applicationName") + .data(bundle) + .send() + } + }.orElse(Mono.empty()) + } +} \ No newline at end of file diff --git a/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/MessageRSocketEndPoint.kt b/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/MessageRSocketEndPoint.kt new file mode 100644 index 0000000..6af5797 --- /dev/null +++ b/src/main/kotlin/it/valeriovaudi/i18nmessage/messages/MessageRSocketEndPoint.kt @@ -0,0 +1,16 @@ +package it.valeriovaudi.i18nmessage.messages + +import org.springframework.messaging.handler.annotation.DestinationVariable +import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.stereotype.Controller +import java.util.* + +@Controller +class MessageRSocketEndPoint(private val messageRepository: MessageRepository) { + + @MessageMapping("messages.{application}") + fun messageEndPointRoute( + @DestinationVariable("application") application: String, + lang: String? + ) = messageRepository.find(application, Optional.ofNullable(lang).map { Locale(it) }.orElse(Locale.ENGLISH)) +} \ No newline at end of file