Skip to content
This repository has been archived by the owner on Mar 11, 2022. It is now read-only.

Commit

Permalink
[PR Review Feedback] second batch of changes after review of #5
Browse files Browse the repository at this point in the history
  • Loading branch information
marcellanz committed Oct 22, 2019
1 parent 14373da commit accb0f3
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 58 deletions.
49 changes: 23 additions & 26 deletions cloudstate/cloudstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,20 @@ const (

// CloudState is an instance of a CloudState User Function
type CloudState struct {
server *grpc.Server
entityDiscoveryResponder *EntityDiscoveryService
eventSourcedHandler *EventSourcedHandler
server *grpc.Server
entityDiscoveryServer *EntityDiscoveryServer
eventSourcedServer *EventSourcedServer
}

// New returns a new CloudState instance.
func New(options Options) (*CloudState, error) {
cs := &CloudState{
server: grpc.NewServer(),
entityDiscoveryResponder: newEntityDiscoveryResponder(options),
eventSourcedHandler: NewEventSourcedHandler(),
}
protocol.RegisterEntityDiscoveryServer(cs.server, cs.entityDiscoveryResponder)
log.Println("RegisterEntityDiscoveryServer")
protocol.RegisterEventSourcedServer(cs.server, cs.eventSourcedHandler)
log.Println("RegisterEventSourcedServer")
server: grpc.NewServer(),
entityDiscoveryServer: newEntityDiscoveryResponder(options),
eventSourcedServer: newEventSourcedServer(),
}
protocol.RegisterEntityDiscoveryServer(cs.server, cs.entityDiscoveryServer)
protocol.RegisterEventSourcedServer(cs.server, cs.eventSourcedServer)
return cs, nil
}

Expand Down Expand Up @@ -88,10 +86,10 @@ func (cs *CloudState) RegisterEventSourcedEntity(ese *EventSourcedEntity, config
if err = ese.initZeroValue(); err != nil {
return
}
if err = cs.eventSourcedHandler.registerEntity(ese); err != nil {
if err = cs.eventSourcedServer.registerEntity(ese); err != nil {
return
}
if err = cs.entityDiscoveryResponder.registerEntity(ese, config); err != nil {
if err = cs.entityDiscoveryServer.registerEntity(ese, config); err != nil {
return
}
})
Expand All @@ -112,23 +110,22 @@ func (cs *CloudState) Run() error {
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
log.Printf("starting grpcServer at: %s:%s", host, port)
if e := cs.server.Serve(lis); e != nil {
return fmt.Errorf("failed to grpcServer.Serve for: %v", lis)
}
return nil
}

// EntityDiscoveryService implements the CloudState discovery protocol.
type EntityDiscoveryService struct {
// EntityDiscoveryServer implements the CloudState discovery protocol.
type EntityDiscoveryServer struct {
fileDescriptorSet *filedescr.FileDescriptorSet
entitySpec *protocol.EntitySpec
message *descriptor.Message
}

// newEntityDiscoveryResponder returns a new and initialized EntityDiscoveryService.
func newEntityDiscoveryResponder(options Options) *EntityDiscoveryService {
responder := &EntityDiscoveryService{}
// newEntityDiscoveryResponder returns a new and initialized EntityDiscoveryServer.
func newEntityDiscoveryResponder(options Options) *EntityDiscoveryServer {
responder := &EntityDiscoveryServer{}
responder.entitySpec = &protocol.EntitySpec{
Entities: make([]*protocol.Entity, 0),
ServiceInfo: &protocol.ServiceInfo{
Expand All @@ -146,7 +143,7 @@ func newEntityDiscoveryResponder(options Options) *EntityDiscoveryService {
}

// Discover returns an entity spec for
func (r *EntityDiscoveryService) Discover(c context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) {
func (r *EntityDiscoveryServer) Discover(c context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) {
log.Printf("Received discovery call from sidecar [%s w%s] supporting CloudState %v.%v\n",
pi.ProxyName,
pi.ProxyVersion,
Expand All @@ -172,12 +169,12 @@ func (r *EntityDiscoveryService) Discover(c context.Context, pi *protocol.ProxyI
}

// ReportError logs any user function error reported by the CloudState proxy.
func (r *EntityDiscoveryService) ReportError(c context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) {
func (r *EntityDiscoveryServer) ReportError(c context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) {
log.Printf("ReportError: %v\n", fe)
return &empty.Empty{}, nil
}

func (r *EntityDiscoveryService) updateSpec() (err error) {
func (r *EntityDiscoveryServer) updateSpec() (err error) {
protoBytes, err := proto.Marshal(r.fileDescriptorSet)
if err != nil {
return errors.New("unable to Marshal FileDescriptorSet")
Expand All @@ -186,7 +183,7 @@ func (r *EntityDiscoveryService) updateSpec() (err error) {
return nil
}

func (r *EntityDiscoveryService) resolveFileDescriptors(dc DescriptorConfig) error {
func (r *EntityDiscoveryServer) resolveFileDescriptors(dc DescriptorConfig) error {
// service
if dc.Service != "" {
if err := r.registerFileDescriptorProto(dc.Service); err != nil {
Expand All @@ -213,7 +210,7 @@ func (r *EntityDiscoveryService) resolveFileDescriptors(dc DescriptorConfig) err
return nil
}

func (r *EntityDiscoveryService) registerEntity(e *EventSourcedEntity, config DescriptorConfig) error {
func (r *EntityDiscoveryServer) registerEntity(e *EventSourcedEntity, config DescriptorConfig) error {
if err := r.resolveFileDescriptors(config); err != nil {
return fmt.Errorf("failed to resolveFileDescriptor for DescriptorConfig: %+v: %w", config, err)
}
Expand All @@ -229,7 +226,7 @@ func (r *EntityDiscoveryService) registerEntity(e *EventSourcedEntity, config De
return r.updateSpec()
}

func (r *EntityDiscoveryService) registerFileDescriptorProto(filename string) error {
func (r *EntityDiscoveryServer) registerFileDescriptorProto(filename string) error {
descriptorProto, err := unpackFile(proto.FileDescriptor(filename))
if err != nil {
return fmt.Errorf("failed to registerFileDescriptorProto for filename: %s: %w", filename, err)
Expand All @@ -238,7 +235,7 @@ func (r *EntityDiscoveryService) registerFileDescriptorProto(filename string) er
return r.updateSpec()
}

func (r *EntityDiscoveryService) registerFileDescriptor(msg descriptor.Message) error {
func (r *EntityDiscoveryServer) registerFileDescriptor(msg descriptor.Message) error {
fd, _ := descriptor.ForMessage(msg) // this can panic
if r := recover(); r != nil {
return fmt.Errorf("descriptor.ForMessage panicked (%v) for: %+v", r, msg)
Expand Down
50 changes: 25 additions & 25 deletions cloudstate/eventsourced.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (c EntityInstanceContext) ServiceName() string {
return c.EntityInstance.EventSourcedEntity.ServiceName
}

// EventSourcedHandler is the implementation of the EventSourcedHandler server API for EventSourced service.
type EventSourcedHandler struct {
// EventSourcedServer is the implementation of the EventSourcedServer server API for EventSourced service.
type EventSourcedServer struct {
// entities are indexed by their service name
entities map[string]*EventSourcedEntity
// contexts are entity instance contexts indexed by their entity ids
Expand All @@ -135,22 +135,22 @@ type EventSourcedHandler struct {
cmdMethodCache map[string]reflect.Method
}

// NewEventSourcedHandler returns an initialized EventSourcedHandler
func NewEventSourcedHandler() *EventSourcedHandler {
return &EventSourcedHandler{
// newEventSourcedServer returns an initialized EventSourcedServer
func newEventSourcedServer() *EventSourcedServer {
return &EventSourcedServer{
entities: make(map[string]*EventSourcedEntity),
contexts: make(map[string]*EntityInstanceContext),
cmdMethodCache: make(map[string]reflect.Method),
}
}

func (esh *EventSourcedHandler) registerEntity(ese *EventSourcedEntity) error {
func (esh *EventSourcedServer) registerEntity(ese *EventSourcedEntity) error {
esh.entities[ese.ServiceName] = ese
return nil
}

// Handle
// from EventSourcedServer.Handle
//
// The stream. One stream will be established per active entity.
// Once established, the first message sent will be Init, which contains the entity ID, and,
// if the entity has previously persisted a snapshot, it will contain that snapshot. It will
Expand All @@ -161,45 +161,45 @@ func (esh *EventSourcedHandler) registerEntity(ese *EventSourcedEntity) error {
// message. The entity should reply in order, and any events that the entity requests to be
// persisted the entity should handle itself, applying them to its own state, as if they had
// arrived as events when the event stream was being replayed on load.
func (esh *EventSourcedHandler) Handle(server protocol.EventSourced_HandleServer) error {
func (esh *EventSourcedServer) Handle(stream protocol.EventSourced_HandleServer) error {
var entityId string
var failed error
for {
if failed != nil {
return failed
}
msg, recvErr := server.Recv()
msg, recvErr := stream.Recv()
if recvErr == io.EOF {
return nil
}
if recvErr != nil {
return recvErr
}
if cmd := msg.GetCommand(); cmd != nil {
if err := esh.handleCommand(cmd, server); err != nil {
if err := esh.handleCommand(cmd, stream); err != nil {
// TODO: in general, what happens with the stream here if an error happens?
failed = handleFailure(err, server, cmd.GetId())
failed = handleFailure(err, stream, cmd.GetId())
}
continue
}
if event := msg.GetEvent(); event != nil {
// TODO spec: Why does command carry the entityId and an event not?
if err := esh.handleEvent(entityId, event); err != nil {
failed = handleFailure(err, server, 0)
failed = handleFailure(err, stream, 0)
}
continue
}
if init := msg.GetInit(); init != nil {
if err := esh.handleInit(init, server); err != nil {
failed = handleFailure(err, server, 0)
if err := esh.handleInit(init, stream); err != nil {
failed = handleFailure(err, stream, 0)
}
entityId = init.GetEntityId()
continue
}
}
}

func (esh *EventSourcedHandler) handleInit(init *protocol.EventSourcedInit, server protocol.EventSourced_HandleServer) error {
func (esh *EventSourcedServer) handleInit(init *protocol.EventSourcedInit, server protocol.EventSourced_HandleServer) error {
eid := init.GetEntityId()
if _, present := esh.contexts[eid]; present {
return NewFailureError("unable to server.Send")
Expand All @@ -225,7 +225,7 @@ func (esh *EventSourcedHandler) handleInit(init *protocol.EventSourcedInit, serv
return nil
}

func (esh *EventSourcedHandler) handleInitSnapshot(init *protocol.EventSourcedInit) error {
func (esh *EventSourcedServer) handleInitSnapshot(init *protocol.EventSourcedInit) error {
if init.Snapshot == nil {
return nil
}
Expand All @@ -247,7 +247,7 @@ func (esh *EventSourcedHandler) handleInitSnapshot(init *protocol.EventSourcedIn
return nil
}

func (EventSourcedHandler) unmarshalSnapshot(init *protocol.EventSourcedInit) (interface{}, error) {
func (EventSourcedServer) unmarshalSnapshot(init *protocol.EventSourcedInit) (interface{}, error) {
// see: https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/any#typeurl
typeUrl := init.Snapshot.Snapshot.GetTypeUrl()
if !strings.Contains(typeUrl, "://") {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (EventSourcedHandler) unmarshalSnapshot(init *protocol.EventSourcedInit) (i
return nil, fmt.Errorf("unmarshalling snapshot failed with: no snapshot unmarshaller found for: %v", typeURL.String())
}

func (esh *EventSourcedHandler) subscribeEvents(instance *EntityInstance) {
func (esh *EventSourcedServer) subscribeEvents(instance *EntityInstance) {
if emitter, ok := instance.Instance.(EventEmitter); ok {
emitter.Subscribe(&Subscription{
OnNext: func(event interface{}) error {
Expand All @@ -296,7 +296,7 @@ func (esh *EventSourcedHandler) subscribeEvents(instance *EntityInstance) {
}
}

func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.EventSourcedEvent) error {
func (esh *EventSourcedServer) handleEvent(entityId string, event *protocol.EventSourcedEvent) error {
if entityId == "" {
return NewFailureError("no entityId was found from a previous init message for event sequence: %v", event.Sequence)
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.Eve
// Beside calling the service method, we have to collect "events" the service might emit.
// These events afterwards have to be handled by a EventHandler to update the state of the
// entity. The CloudState proxy can re-play these events at any time
func (esh *EventSourcedHandler) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error {
func (esh *EventSourcedServer) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error {
// method to call
method, err := esh.methodToCall(cmd)
if err != nil {
Expand Down Expand Up @@ -400,7 +400,7 @@ func (esh *EventSourcedHandler) handleCommand(cmd *protocol.Command, server prot
}, server)
}

func (*EventSourcedHandler) buildInputs(entityContext *EntityInstanceContext, method reflect.Method, cmd *protocol.Command, ctx context.Context) ([]reflect.Value, error) {
func (*EventSourcedServer) buildInputs(entityContext *EntityInstanceContext, method reflect.Method, cmd *protocol.Command, ctx context.Context) ([]reflect.Value, error) {
inputs := make([]reflect.Value, method.Type.NumIn())
inputs[0] = reflect.ValueOf(entityContext.EntityInstance.Instance)
inputs[1] = reflect.ValueOf(ctx)
Expand All @@ -424,7 +424,7 @@ func (*EventSourcedHandler) buildInputs(entityContext *EntityInstanceContext, me
return inputs, nil
}

func (esh *EventSourcedHandler) methodToCall(cmd *protocol.Command) (reflect.Method, error) {
func (esh *EventSourcedServer) methodToCall(cmd *protocol.Command) (reflect.Method, error) {
entityContext := esh.contexts[cmd.GetEntityId()]
cacheKey := entityContext.ServiceName() + cmd.Name
method, hit := esh.cmdMethodCache[cacheKey]
Expand Down Expand Up @@ -464,7 +464,7 @@ func (esh *EventSourcedHandler) methodToCall(cmd *protocol.Command) (reflect.Met
return method, nil
}

func (*EventSourcedHandler) handleSnapshots(entityContext *EntityInstanceContext) (*any.Any, error) {
func (*EventSourcedServer) handleSnapshots(entityContext *EntityInstanceContext) (*any.Any, error) {
if !entityContext.EntityInstance.shouldSnapshot() {
return nil, nil
}
Expand Down Expand Up @@ -494,7 +494,7 @@ func checkUnary(methodByName reflect.Value) error {
}

// applyEvent applies an event to a local entity
func (esh EventSourcedHandler) applyEvent(entityInstance *EntityInstance, event interface{}) error {
func (esh EventSourcedServer) applyEvent(entityInstance *EntityInstance, event interface{}) error {
payload, err := marshalAny(event)
if err != nil {
return err
Expand All @@ -509,7 +509,7 @@ func (esh EventSourcedHandler) applyEvent(entityInstance *EntityInstance, event
// and snapshots is to use protobufs. CloudState will automatically detect if
// an emitted event is a protobuf, and serialize it as such. For other
// serialization options, including JSON, see Serialization.
func (EventSourcedHandler) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error {
func (EventSourcedServer) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error {
eventHandler, implementsEventHandler := entityInstance.Instance.(EventHandler)
for _, event := range events {
// TODO: here's the point where events can be protobufs, serialized as json or other formats
Expand Down
14 changes: 7 additions & 7 deletions cloudstate/eventsourced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ func (t TestEventSourcedHandleServer) Recv() (*protocol.EventSourcedStreamIn, er
return nil, nil
}

func newHandler(t *testing.T) *EventSourcedHandler {
handler := NewEventSourcedHandler()
func newHandler(t *testing.T) *EventSourcedServer {
handler := newEventSourcedServer()
entity := EventSourcedEntity{
Entity: (*TestEntity)(nil),
ServiceName: "TestEventSourcedHandler-Service",
ServiceName: "TestEventSourcedServer-Service",
SnapshotEvery: 0,
registerOnce: sync.Once{},
}
Expand All @@ -207,9 +207,9 @@ func newHandler(t *testing.T) *EventSourcedHandler {
return handler
}

func initHandler(handler *EventSourcedHandler, t *testing.T) {
func initHandler(handler *EventSourcedServer, t *testing.T) {
err := handler.handleInit(&protocol.EventSourcedInit{
ServiceName: "TestEventSourcedHandler-Service",
ServiceName: "TestEventSourcedServer-Service",
EntityId: "entity-0",
}, nil)
if err != nil {
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestSnapshot(t *testing.T) {
t.Fatalf("%v", err)
}
err = handler.handleInit(&protocol.EventSourcedInit{
ServiceName: "TestEventSourcedHandler-Service",
ServiceName: "TestEventSourcedServer-Service",
EntityId: "entity-0",
Snapshot: &protocol.EventSourcedSnapshot{
SnapshotSequence: 0,
Expand All @@ -268,7 +268,7 @@ func TestSnapshot(t *testing.T) {
}
}

func TestEventSourcedHandlerHandlesCommandAndEvents(t *testing.T) {
func TestEventSourcedServerHandlesCommandAndEvents(t *testing.T) {
resetTestEntity()
handler := newHandler(t)
if testEntity.Value >= 0 {
Expand Down

0 comments on commit accb0f3

Please sign in to comment.