Skip to content

Commit

Permalink
fix: exposing all query endpoints from the View component (#2009)
Browse files Browse the repository at this point in the history
* fix: exposing all query endpoints from the View component

* renaming

* fixing docs
  • Loading branch information
aludwiko authored Feb 5, 2024
1 parent 5e2825b commit 094d189
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the

WARNING: Changing the `service` name after it has once been deployed means the consumer will start over from the beginning of the event stream.

See https://docs.kalix.io/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.
See https://docs.kalix.io/reference/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.

== Handling Serialization

Expand Down Expand Up @@ -475,7 +475,7 @@ If an action has a return type of `BytesValue` and publishes to a topic, the eve

It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.

Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the `topic` string in `eventing.in.topic` or `eventing.out.topic` blocks. See https://docs.kalix.io/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.
Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the `topic` string in `eventing.in.topic` or `eventing.out.topic` blocks. See https://docs.kalix.io/reference/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.

WARNING: Changing the `topic` name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.

Expand Down
6 changes: 3 additions & 3 deletions docs/src/modules/java-protobuf/pages/views.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,9 @@ The View definitions are stored and validated when a new version is deployed. Th

=== Drop obsolete view data

The data for old Views, that are no longer actively used, can be dropped using the `kalix` CLI https://docs.kalix.io/kalix/kalix_services_views.html[service view commands].
The data for old Views, that are no longer actively used, can be dropped using the `kalix` CLI https://docs.kalix.io/reference/kalix/kalix_services_views.html[service view commands].

A summary of all views for a running service can be listed using the https://docs.kalix.io/kalix/kalix_services_views_list.html[views list command]:
A summary of all views for a running service can be listed using the https://docs.kalix.io/reference/kalix/kalix_services_views_list.html[views list command]:

----
> kalix service views list customer-registry
Expand All @@ -491,7 +491,7 @@ CustomerByName false 1d
CustomerByNameV2 true 5m
----

Any views that are inactive and no longer needed can be dropped using the https://docs.kalix.io/kalix/kalix_services_views_drop.html[views drop command]:
Any views that are inactive and no longer needed can be dropped using the https://docs.kalix.io/reference/kalix/kalix_services_views_drop.html[views drop command]:

----
> kalix service views drop customer-registry CustomerByName
Expand Down
4 changes: 2 additions & 2 deletions docs/src/modules/java/pages/publishing-subscribing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the

WARNING: Changing the `service` name after it has once been deployed means the consumer will start over from the beginning of the event stream.

See https://docs.kalix.io/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.
See https://docs.kalix.io/reference/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.



Expand Down Expand Up @@ -185,7 +185,7 @@ If an action has a return type of `byte[]` and publishes to a topic, the events

It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.

Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the `value` parameter of the `@Subscribe.Topic` annotation. See https://docs.kalix.io/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.
Referencing environment variables is done with the syntax `$\{VAR_NAME}` in the `value` parameter of the `@Subscribe.Topic` annotation. See https://docs.kalix.io/reference/kalix/kalix_services_deploy.html[kalix service deploy] for details on how to set environment variables when deploying a service.

WARNING: Changing the `topic` name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.example.wiring.views.UserCountersView;
import com.example.wiring.views.UserWithVersion;
import com.example.wiring.views.UserWithVersionView;
import com.example.wiring.views.UsersByEmail;
import com.example.wiring.views.UsersView;
import com.example.wiring.views.UsersByEmailAndName;
import com.google.protobuf.any.Any;
import kalix.javasdk.DeferredCall;
Expand Down Expand Up @@ -647,11 +647,28 @@ public void verifyFindUsersByEmail() {
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() -> {
var byEmail = execute(componentClient.forView().call(UsersByEmail::getUsers).params(user.email));
var byEmail = execute(componentClient.forView().call(UsersView::getUsersEmail).params(user.email));
assertThat(byEmail.email).isEqualTo(user.email);
});
}

@Test
public void verifyFindUsersByName() {

TestUser user = new TestUser("JohnDoe2", "[email protected]", "JohnDoe2");
createUser(user);

// the view is eventually updated
await()
.ignoreExceptions()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() -> {
var byName = execute(componentClient.forView().call(UsersView::getUsersByName).params(user.name));
assertThat(byName.name).isEqualTo(user.name);
});
}

@Test
public void verifyFindUsersByEmailAndName() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@Table("users_by_email")
@Table("users")
@Subscribe.ValueEntity(UserEntity.class)
public class UsersByEmail extends View<User> {
public class UsersView extends View<User> {

@GetMapping("/users/by_email/{email}")
@Query("SELECT * FROM users_by_email WHERE email = :email")
public User getUsers(@PathVariable String email) {
@Query("SELECT * FROM users WHERE email = :email")
public User getUsersEmail(@PathVariable String email) {
return null;
}

@GetMapping("/users/by_name/{name}")
@Query("SELECT * FROM users WHERE name = :name")
public User getUsersByName(@PathVariable String name) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,45 @@ private[impl] object ViewDescriptorFactory extends ComponentDescriptorFactory {
}
}

// we only take methods with Query annotations and Spring REST annotations
val (
val allQueryMethods = queryMethods(component)

val kalixMethods: Seq[KalixMethod] = allQueryMethods.map(_.queryMethod) ++ updateMethods
val serviceName = nameGenerator.getName(component.getSimpleName)
val additionalMessages =
tableTypeDescriptors.toSet ++ allQueryMethods.map(_.queryOutputSchemaDescriptor) ++ allQueryMethods.flatMap(
_.queryInputSchemaDescriptor.toSet)

val serviceLevelOptions =
mergeServiceOptions(
AclDescriptorFactory.serviceLevelAclAnnotation(component),
JwtDescriptorFactory.serviceLevelJwtAnnotation(component),
eventingInForEventSourcedEntityServiceLevel(component),
eventingInForTopicServiceLevel(component),
subscribeToEventStream(component))

ComponentDescriptor(
nameGenerator,
messageCodec,
serviceName,
serviceOptions = serviceLevelOptions,
component.getPackageName,
kalixMethods,
additionalMessages.toSeq)
}

private case class QueryMethod(
queryMethod: KalixMethod,
queryInputSchemaDescriptor: Option[ProtoMessageDescriptors],
queryOutputSchemaDescriptor: ProtoMessageDescriptors) = {
queryOutputSchemaDescriptor: ProtoMessageDescriptors)

val annotatedQueryMethods = RestServiceIntrospector
.inspectService(component)
.methods
.filter(_.javaMethod.getAnnotation(classOf[Query]) != null)

val queryMethod: SyntheticRequestServiceMethod = annotatedQueryMethods.head
private def queryMethods(component: Class[_]): Seq[QueryMethod] = {
// we only take methods with Query annotations and Spring REST annotations
val annotatedQueryMethods = RestServiceIntrospector
.inspectService(component)
.methods
.filter(_.javaMethod.getAnnotation(classOf[Query]) != null)

annotatedQueryMethods.map { queryMethod =>
val queryOutputType = {
val returnType = queryMethod.javaMethod.getReturnType
if (returnType == classOf[Flux[_]]) {
Expand Down Expand Up @@ -200,34 +226,11 @@ private[impl] object ViewDescriptorFactory extends ComponentDescriptorFactory {

// since it is a query, we don't actually ever want to handle any request in the SDK
// the proxy does the work for us, mark the method as non-callable
(
KalixMethod(queryMethod.copy(callable = false), methodOptions = Some(methodOptions))
.withKalixOptions(buildJWTOptions(queryMethod.javaMethod)),
queryInputSchemaDescriptor,
queryOutputSchemaDescriptor)
}

val kalixMethods: Seq[KalixMethod] = queryMethod +: updateMethods
val serviceName = nameGenerator.getName(component.getSimpleName)
val additionalMessages =
tableTypeDescriptors.toSet ++ Set(queryOutputSchemaDescriptor) ++ queryInputSchemaDescriptor.toSet
val kalixQueryMethod = KalixMethod(queryMethod.copy(callable = false), methodOptions = Some(methodOptions))
.withKalixOptions(buildJWTOptions(queryMethod.javaMethod))

val serviceLevelOptions =
mergeServiceOptions(
AclDescriptorFactory.serviceLevelAclAnnotation(component),
JwtDescriptorFactory.serviceLevelJwtAnnotation(component),
eventingInForEventSourcedEntityServiceLevel(component),
eventingInForTopicServiceLevel(component),
subscribeToEventStream(component))

ComponentDescriptor(
nameGenerator,
messageCodec,
serviceName,
serviceOptions = serviceLevelOptions,
component.getPackageName,
kalixMethods,
additionalMessages.toSeq)
QueryMethod(kalixQueryMethod, queryInputSchemaDescriptor, queryOutputSchemaDescriptor)
}
}

private def methodsForTypeLevelStreamSubscriptions(
Expand Down

0 comments on commit 094d189

Please sign in to comment.