Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To rsocket #3

Merged
merged 7 commits into from
Feb 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,56 @@
</properties>

<dependencies>
<!-- https://mvnrepository.com/artifact/io.arrow-kt/arrow-core -->
<dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
<version>0.10.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.509</version>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.714</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs -->
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.amazonaws/amazon-sqs-java-messaging-lib -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.8</version>
</dependency>


<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,101 @@
package it.valeriovaudi.i18nmessage

import com.amazon.sqs.javamessaging.ProviderConfiguration
import com.amazon.sqs.javamessaging.SQSConnectionFactory
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.sqs.AmazonSQSClientBuilder
import io.rsocket.transport.netty.client.TcpClientTransport
import it.valeriovaudi.i18nmessage.messages.AwsS3MessageRepository
import it.valeriovaudi.i18nmessage.messages.MessageRepository
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.jms.annotation.EnableJms
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
import org.springframework.jms.support.destination.DynamicDestinationResolver
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.messaging.rsocket.RSocketStrategies
import reactor.core.publisher.Mono
import java.net.InetSocketAddress
import javax.jms.ConnectionFactory
import javax.jms.Session


@EnableJms
@SpringBootApplication
@EnableConfigurationProperties(value = [RSocketApplicationClientApps::class])
class I18nMessageApplication {

@Bean
fun messageRepository(@Value("\${aws.s3.access-key}") accessKey: String,
@Value("\${aws.s3.secret-key}") awsSecretKey: String,
@Value("\${aws.s3.region}") awsRegion: String,
@Value("\${aws.s3.bucket}") awsBucket: String): MessageRepository {
val credentials = BasicAWSCredentials(accessKey, awsSecretKey)
fun sqsConnectionFactory(@Value("\${aws.s3.region}") awsRegion: String,
awsCredentialsProvider: AWSCredentialsProvider): SQSConnectionFactory {
return SQSConnectionFactory(
ProviderConfiguration(),
AmazonSQSClientBuilder
.standard()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build()
)

}

@Bean
fun jmsListenerContainerFactory(sqsConnectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
factory.setConnectionFactory(sqsConnectionFactory)
factory.setDestinationResolver(DynamicDestinationResolver())
factory.setConcurrency("3-10")
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
return factory
}

@Bean
fun requesters(rSocketStrategies: RSocketStrategies,
rSocketApplicationClientApps: RSocketApplicationClientApps,
builder: RSocketRequester.Builder): Map<String, Mono<RSocketRequester>> =

rSocketApplicationClientApps.clients.map {
val address = InetSocketAddress(it.host, it.port)
val clientTransport: TcpClientTransport = TcpClientTransport.create(address)
mapOf(it.id to builder.rsocketStrategies(rSocketStrategies)
.connect(clientTransport))
}.reduce { acc, map -> acc + map }

@Bean
fun messageRepository(@Value("\${aws.s3.region}") awsRegion: String,
@Value("\${aws.s3.bucket}") awsBucket: String,
awsCredentialsProvider: AWSCredentialsProvider): MessageRepository {

val s3client = AmazonS3ClientBuilder
.standard()
.withCredentials(AWSStaticCredentialsProvider(credentials))
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build()

return AwsS3MessageRepository(s3client, awsBucket)
}

@Bean
fun awsCredentialsProvider(@Value("\${aws.s3.access-key}") accessKey: String,
@Value("\${aws.s3.secret-key}") awsSecretKey: String):
AWSCredentialsProvider = AWSStaticCredentialsProvider(BasicAWSCredentials(accessKey, awsSecretKey))

}

@ConstructorBinding
@ConfigurationProperties(prefix = "rsocket")
data class RSocketApplicationClientApps(val clients: List<RSocketApplicationClientApp>)

data class RSocketApplicationClientApp(val id: String, val host: String, val port: Int)

fun main(args: Array<String>) {
runApplication<I18nMessageApplication>(*args)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ open class AwsS3MessageRepository(private val s3client: AmazonS3,

private fun s3MessageBundleFor(message: ObjectListing, application: String, language: String) =
Mono.defer {
messageKeyFor(application)
.let { messagesKey ->
Optional.ofNullable(message.objectSummaries.find(resourceBundleFinderPredicate(messagesKey, language)))
.orElse(message.objectSummaries.find(defaultResourceBundleFinderPredicate(messagesKey)))
}.toMono()
Optional.ofNullable(message.objectSummaries.find(resourceBundleFinderPredicate(application, language)))
.orElse(message.objectSummaries.find(defaultResourceBundleFinderPredicate(application)))
.toMono()
}

private fun defaultResourceBundleFinderPredicate(messagesKey: String): S3ObjectSummaryPredicate =
Expand All @@ -45,8 +43,6 @@ open class AwsS3MessageRepository(private val s3client: AmazonS3,
.map { it.toMap() as Map<String, String> }


private fun messageKeyFor(application: String) = "i18n-messages/$application"

private fun getAllS3MessagesBundleFor(application: String) = Mono.fromCallable { s3client.listObjects(bucketName, messageKeyFor(application)) }
private fun getAllS3MessagesBundleFor(application: String) = Mono.fromCallable { s3client.listObjects(bucketName, "$application") }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package it.valeriovaudi.i18nmessage.messages

import com.jayway.jsonpath.JsonPath
import org.springframework.jms.annotation.JmsListener
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
import java.util.*

@Component
class AwsSQSListener(private val messageRepository: MessageRepository,
private val requesters: Map<String, Mono<RSocketRequester>>) {

@JmsListener(destination = "i18n-messages-updates")
fun onMessage(message: String) {
applicationNameFor2(message)
.map { applicationName ->

println("application $applicationName bundle are refreshing")

messageRepository.find(applicationName, Locale.ENGLISH)
.flatMap { bundle ->
sendBundleToClient(bundle, applicationName)
}.subscribe()
}
}

private fun applicationNameFor2(message: String) =
Optional.ofNullable(JsonPath.read(message, "$.detail.requestParameters.key") as String?)
.map { key -> key.split("/").first() }


private fun sendBundleToClient(bundle: Messages, applicationName: String): Mono<Void>? {
return Optional.ofNullable(bundle)
.map { requesters[applicationName] }
.map { applicationRequester ->
applicationRequester?.flatMap { req ->
req.route("messages.$applicationName")
.data(bundle)
.send()
}
}.orElse(Mono.empty())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package it.valeriovaudi.i18nmessage.messages

import org.springframework.messaging.handler.annotation.DestinationVariable
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.stereotype.Controller
import java.util.*

@Controller
class MessageRSocketEndPoint(private val messageRepository: MessageRepository) {

@MessageMapping("messages.{application}")
fun messageEndPointRoute(
@DestinationVariable("application") application: String,
lang: String?
) = messageRepository.find(application, Optional.ofNullable(lang).map { Locale(it) }.orElse(Locale.ENGLISH))
}