diff --git a/statefun-sdk-go/v3/pkg/statefun/context.go b/statefun-sdk-go/v3/pkg/statefun/context.go index 7f998269e..58f308b40 100644 --- a/statefun-sdk-go/v3/pkg/statefun/context.go +++ b/statefun-sdk-go/v3/pkg/statefun/context.go @@ -42,15 +42,15 @@ type Context interface { Caller() *Address // Send forwards out a MessageBuilder to another function. - Send(message MessageBuilder) + Send(message MessageBuilder) error // SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay. - SendAfter(delay time.Duration, message MessageBuilder) + SendAfter(delay time.Duration, message MessageBuilder) error // SendAfterWithCancellationToken forwards out a MessageBuilder to another function, // after a specified time.Duration delay. The message is tagged with a non-empty, //unique token to attach to this message, to be used for message cancellation - SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) + SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) error // CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken). // NOTE: this is a best-effort operation, since the message might have been already delivered. @@ -58,7 +58,7 @@ type Context interface { CancelDelayedMessage(token CancellationToken) // SendEgress forwards out an EgressBuilder to an egress. - SendEgress(egress EgressBuilder) + SendEgress(egress EgressBuilder) error // Storage returns the AddressScopedStorage, providing access to stored values scoped to the // current invoked function instance's Address (which is obtainable using Self()). @@ -86,11 +86,10 @@ func (s *statefunContext) Caller() *Address { return s.caller } -func (s *statefunContext) Send(message MessageBuilder) { +func (s *statefunContext) Send(message MessageBuilder) error { msg, err := message.ToMessage() - if err != nil { - panic(err) + return err } invocation := &protocol.FromFunction_Invocation{ @@ -101,13 +100,14 @@ func (s *statefunContext) Send(message MessageBuilder) { s.Lock() s.response.OutgoingMessages = append(s.response.OutgoingMessages, invocation) s.Unlock() + + return nil } -func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) { +func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) error { msg, err := message.ToMessage() - if err != nil { - panic(err) + return err } invocation := &protocol.FromFunction_DelayedInvocation{ @@ -119,13 +119,14 @@ func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) s.Lock() s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation) s.Unlock() + + return nil } -func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) { +func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) error { msg, err := message.ToMessage() - if err != nil { - panic(err) + return err } invocation := &protocol.FromFunction_DelayedInvocation{ @@ -138,6 +139,8 @@ func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, to s.Lock() s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation) s.Unlock() + + return nil } func (s *statefunContext) CancelDelayedMessage(token CancellationToken) { @@ -151,16 +154,17 @@ func (s *statefunContext) CancelDelayedMessage(token CancellationToken) { s.Unlock() } -func (s *statefunContext) SendEgress(egress EgressBuilder) { +func (s *statefunContext) SendEgress(egress EgressBuilder) error { msg, err := egress.toEgressMessage() - if err != nil { - panic(err) + return err } s.Lock() s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg) s.Unlock() + + return nil } // DeriveContext derives a new statefun.Context from an existing one, replacing diff --git a/statefun-sdk-go/v3/pkg/statefun/context_test.go b/statefun-sdk-go/v3/pkg/statefun/context_test.go index f12f1f46a..2b7e8fb75 100644 --- a/statefun-sdk-go/v3/pkg/statefun/context_test.go +++ b/statefun-sdk-go/v3/pkg/statefun/context_test.go @@ -41,7 +41,7 @@ func TestStatefunContext_Send(t *testing.T) { msg := MessageBuilder{ Target: Address{ - FunctionType: TypeNameFrom("example/func"), + FunctionType: MustParseTypeName("example/func"), Id: "0", }, Value: "hello", @@ -63,7 +63,7 @@ func TestStatefunContext_SendAfter(t *testing.T) { msg := MessageBuilder{ Target: Address{ - FunctionType: TypeNameFrom("example/func"), + FunctionType: MustParseTypeName("example/func"), Id: "0", }, Value: "hello", @@ -88,7 +88,7 @@ func TestStatefunContext_SendAfterWithCancellationTokenMessage(t *testing.T) { msg := MessageBuilder{ Target: Address{ - FunctionType: TypeNameFrom("example/func"), + FunctionType: MustParseTypeName("example/func"), Id: "0", }, Value: "hello", @@ -127,7 +127,7 @@ func TestStatefunContext_SendEgress_Kafka(t *testing.T) { context := createContext() kafka := &KafkaEgressBuilder{ - Target: TypeNameFrom("example/kafka"), + Target: MustParseTypeName("example/kafka"), Topic: "topic", Key: "key", Value: "value", @@ -151,7 +151,7 @@ func TestStatefunContext_SendEgress_Kinesis(t *testing.T) { context := createContext() kafka := &KinesisEgressBuilder{ - Target: TypeNameFrom("example/kinesis"), + Target: MustParseTypeName("example/kinesis"), Stream: "stream", PartitionKey: "key", Value: "value", @@ -203,8 +203,8 @@ func createContext() *statefunContext { return &statefunContext{ Context: context.WithValue(context.Background(), testContextKey1, testContextValue1), Mutex: new(sync.Mutex), - caller: &Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"}, - self: Address{FunctionType: TypeNameFrom("namespace/function2"), Id: "2"}, + caller: &Address{FunctionType: MustParseTypeName("namespace/function1"), Id: "1"}, + self: Address{FunctionType: MustParseTypeName("namespace/function2"), Id: "2"}, storage: new(storage), response: &protocol.FromFunction_InvocationResponse{}, } diff --git a/statefun-sdk-go/v3/pkg/statefun/egress_test.go b/statefun-sdk-go/v3/pkg/statefun/egress_test.go index 47efe8183..b29a1057e 100644 --- a/statefun-sdk-go/v3/pkg/statefun/egress_test.go +++ b/statefun-sdk-go/v3/pkg/statefun/egress_test.go @@ -17,15 +17,16 @@ package statefun import ( + "testing" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" - "testing" ) func TestKafkaEgressBuilder(t *testing.T) { k := KafkaEgressBuilder{ - Target: TypeNameFrom("example/target"), + Target: MustParseTypeName("example/target"), Topic: "topic", Key: "key", Value: "value", @@ -45,7 +46,7 @@ func TestKafkaEgressBuilder(t *testing.T) { func TestKafkaEgressBuilderInvalidString(t *testing.T) { k := KafkaEgressBuilder{ - Target: TypeNameFrom("example/target"), + Target: MustParseTypeName("example/target"), Topic: "topic", Key: "key", Value: string([]byte{0xff, 0xfe, 0xfd}), @@ -57,7 +58,7 @@ func TestKafkaEgressBuilderInvalidString(t *testing.T) { func TestKinesisEgressBuilder(t *testing.T) { k := KinesisEgressBuilder{ - Target: TypeNameFrom("example/target"), + Target: MustParseTypeName("example/target"), Stream: "stream", PartitionKey: "key", Value: "value", @@ -77,7 +78,7 @@ func TestKinesisEgressBuilder(t *testing.T) { func TestKinesisEgressBuilderInvalidString(t *testing.T) { k := KinesisEgressBuilder{ - Target: TypeNameFrom("example/target"), + Target: MustParseTypeName("example/target"), Stream: "stream", PartitionKey: "key", Value: string([]byte{0xff, 0xfe, 0xfd}), diff --git a/statefun-sdk-go/v3/pkg/statefun/handler_test.go b/statefun-sdk-go/v3/pkg/statefun/handler_test.go index e315c4c5d..350a988df 100644 --- a/statefun-sdk-go/v3/pkg/statefun/handler_test.go +++ b/statefun-sdk-go/v3/pkg/statefun/handler_test.go @@ -63,7 +63,7 @@ func invokeStatefulFunction(ctx context.Context, target *Address, caller *Addres func TestStatefunHandler_WithNoCaller_ContextCallerIsNil(t *testing.T) { - target := Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"} + target := Address{FunctionType: MustParseTypeName("namespace/function1"), Id: "1"} statefulFunction := func(ctx Context, message Message) error { assert.Nil(t, ctx.Caller()) @@ -76,8 +76,8 @@ func TestStatefunHandler_WithNoCaller_ContextCallerIsNil(t *testing.T) { func TestStatefunHandler_WithCaller_ContextCallerIsCorrect(t *testing.T) { - target := Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"} - caller := Address{FunctionType: TypeNameFrom("namespace/function2"), Id: "2"} + target := Address{FunctionType: MustParseTypeName("namespace/function1"), Id: "1"} + caller := Address{FunctionType: MustParseTypeName("namespace/function2"), Id: "2"} statefulFunction := func(ctx Context, message Message) error { assert.Equal(t, caller.String(), ctx.Caller().String()) diff --git a/statefun-sdk-go/v3/pkg/statefun/message.go b/statefun-sdk-go/v3/pkg/statefun/message.go index dff5fdf4d..11bbe7989 100644 --- a/statefun-sdk-go/v3/pkg/statefun/message.go +++ b/statefun-sdk-go/v3/pkg/statefun/message.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" ) @@ -88,73 +89,73 @@ func (m *Message) IsBool() bool { return m.Is(BoolType) } -func (m *Message) AsBool() bool { +func (m *Message) AsBool() (bool, error) { var receiver bool if err := BoolType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil { - panic(fmt.Errorf("failed to deserialize message: %w", err)) + return false, fmt.Errorf("failed to deserialize message: %w", err) } - return receiver + return receiver, nil } func (m *Message) IsInt32() bool { return m.Is(Int32Type) } -func (m *Message) AsInt32() int32 { +func (m *Message) AsInt32() (int32, error) { var receiver int32 if err := Int32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil { - panic(fmt.Errorf("failed to deserialize message: %w", err)) + return 0, fmt.Errorf("failed to deserialize message: %w", err) } - return receiver + return receiver, nil } func (m *Message) IsInt64() bool { return m.Is(Int64Type) } -func (m *Message) AsInt64() int64 { +func (m *Message) AsInt64() (int64, error) { var receiver int64 if err := Int64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil { - panic(fmt.Errorf("failed to deserialize message: %w", err)) + return 0, fmt.Errorf("failed to deserialize message: %w", err) } - return receiver + return receiver, nil } func (m *Message) IsFloat32() bool { return m.Is(Float32Type) } -func (m *Message) AsFloat32() float32 { +func (m *Message) AsFloat32() (float32, error) { var receiver float32 if err := Float32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil { - panic(fmt.Errorf("failed to deserialize message: %w", err)) + return 0, fmt.Errorf("failed to deserialize message: %w", err) } - return receiver + return receiver, nil } func (m *Message) IsFloat64() bool { return m.Is(Float64Type) } -func (m *Message) AsFloat64() float64 { +func (m *Message) AsFloat64() (float64, error) { var receiver float64 if err := Float64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil { - panic(fmt.Errorf("failed to deserialize message: %w", err)) + return 0, fmt.Errorf("failed to deserialize message: %w", err) } - return receiver + return receiver, nil } func (m *Message) IsString() bool { return m.Is(StringType) } -func (m *Message) AsString() string { +func (m *Message) AsString() (string, error) { var receiver string if err := StringType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil { - panic(fmt.Errorf("failed to deserialize message: %w", err)) + return "", fmt.Errorf("failed to deserialize message: %w", err) } - return receiver + return receiver, nil } func (m *Message) Is(t SimpleType) bool { @@ -166,7 +167,8 @@ func (m *Message) As(t SimpleType, receiver interface{}) error { } func (m *Message) ValueTypeName() TypeName { - return TypeNameFrom(m.typedValue.Typename) + typename, _ := ParseTypeName(m.typedValue.Typename) + return typename } func (m *Message) RawValue() []byte { diff --git a/statefun-sdk-go/v3/pkg/statefun/message_test.go b/statefun-sdk-go/v3/pkg/statefun/message_test.go index 2b18894c6..46fe7338d 100644 --- a/statefun-sdk-go/v3/pkg/statefun/message_test.go +++ b/statefun-sdk-go/v3/pkg/statefun/message_test.go @@ -17,8 +17,9 @@ package statefun import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestBasicIntMessage(t *testing.T) { @@ -36,7 +37,7 @@ func TestBasicIntMessage(t *testing.T) { assert.NoError(t, err) assert.True(t, message.IsInt32()) - value := message.AsInt32() + value, _ := message.AsInt32() assert.Equal(t, value, int32(1)) } @@ -56,6 +57,6 @@ func TestMessageWithType(t *testing.T) { assert.NoError(t, err) assert.True(t, message.IsFloat32()) - value := message.AsFloat32() + value, _ := message.AsFloat32() assert.Equal(t, value, float32(5.0)) } diff --git a/statefun-sdk-go/v3/pkg/statefun/storage.go b/statefun-sdk-go/v3/pkg/statefun/storage.go index 47b021fe9..9ded7138c 100644 --- a/statefun-sdk-go/v3/pkg/statefun/storage.go +++ b/statefun-sdk-go/v3/pkg/statefun/storage.go @@ -18,9 +18,10 @@ package statefun import ( "fmt" + "sync" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal" "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" - "sync" ) // An AddressScopedStorage is used for reading and writing persistent @@ -33,24 +34,24 @@ import ( // values through this storage. type AddressScopedStorage interface { - // Get returnss the values of the provided ValueSpec, scoped to the + // Get returns the values of the provided ValueSpec, scoped to the // current invoked Address and stores the result in the value // pointed to by receiver. The method will return false // if there is no value for the spec in storage // so callers can differentiate between missing and // the types zero value. - Get(spec ValueSpec, receiver interface{}) (exists bool) + Get(spec ValueSpec, receiver interface{}) (exists bool, err error) // Set updates the value for the provided ValueSpec, scoped // to the current invoked Address. - Set(spec ValueSpec, value interface{}) + Set(spec ValueSpec, value interface{}) error // Remove deletes the prior value set for the the provided // ValueSpec, scoped to the current invoked Address. // // After removing the value, calling Get for the same // spec under the same Address will return false. - Remove(spec ValueSpec) + Remove(spec ValueSpec) error } type storage struct { @@ -110,53 +111,58 @@ func (s *storage) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec return nil } -func (s *storage) Get(spec ValueSpec, receiver interface{}) bool { +func (s *storage) Get(spec ValueSpec, receiver interface{}) (bool, error) { s.mutex.RLock() defer s.mutex.RUnlock() cell, ok := s.cells[spec.Name] if !ok { - panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name)) + return false, fmt.Errorf("unregistered ValueSpec %s", spec.Name) } if !cell.HasValue() { - return false + return false, nil } cell.SeekToBeginning() + if err := spec.ValueType.Deserialize(cell, receiver); err != nil { - panic(fmt.Errorf("failed to deserialize persisted value `%s`: %w", spec.Name, err)) + return false, fmt.Errorf("failed to deserialize persisted value `%s`: %w", spec.Name, err) } - return true + return true, nil } -func (s *storage) Set(spec ValueSpec, value interface{}) { +func (s *storage) Set(spec ValueSpec, value interface{}) error { s.mutex.Lock() defer s.mutex.Unlock() cell, ok := s.cells[spec.Name] if !ok { - panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name)) + return fmt.Errorf("unregistered ValueSpec %s", spec.Name) } cell.Reset() - err := spec.ValueType.Serialize(cell, value) - if err != nil { - panic(fmt.Errorf("failed to serialize %s: %w", spec.Name, err)) + + if err := spec.ValueType.Serialize(cell, value); err != nil { + return fmt.Errorf("failed to serialize %s: %w", spec.Name, err) } + + return nil } -func (s *storage) Remove(spec ValueSpec) { +func (s *storage) Remove(spec ValueSpec) error { s.mutex.Lock() defer s.mutex.Unlock() cell, ok := s.cells[spec.Name] if !ok { - panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name)) + return fmt.Errorf("unregistered ValueSpec %s", spec.Name) } cell.Delete() + + return nil } func (s *storage) getStateMutations() []*protocol.FromFunction_PersistedValueMutation { diff --git a/statefun-sdk-go/v3/pkg/statefun/typename.go b/statefun-sdk-go/v3/pkg/statefun/typename.go index 8491dc536..c6037d06b 100644 --- a/statefun-sdk-go/v3/pkg/statefun/typename.go +++ b/statefun-sdk-go/v3/pkg/statefun/typename.go @@ -19,17 +19,18 @@ package statefun import ( "errors" "fmt" - "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" "strings" + + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" ) var ( - boolTypeName = TypeNameFrom("io.statefun.types/bool") - int32TypeName = TypeNameFrom("io.statefun.types/int") - int64TypeName = TypeNameFrom("io.statefun.types/long") - float32TypeName = TypeNameFrom("io.statefun.types/float") - float64TypeName = TypeNameFrom("io.statefun.types/double") - stringTypeName = TypeNameFrom("io.statefun.types/string") + boolTypeName = MustParseTypeName("io.statefun.types/bool") + int32TypeName = MustParseTypeName("io.statefun.types/int") + int64TypeName = MustParseTypeName("io.statefun.types/long") + float32TypeName = MustParseTypeName("io.statefun.types/float") + float64TypeName = MustParseTypeName("io.statefun.types/double") + stringTypeName = MustParseTypeName("io.statefun.types/string") ) // A TypeName is used to uniquely identify objects within @@ -66,7 +67,7 @@ func (t typeName) GetType() string { // assumes correctly formatted strings and will panic // on error. For runtime error handling please // see ParseTypeName. -func TypeNameFrom(typename string) TypeName { +func MustParseTypeName(typename string) TypeName { result, err := ParseTypeName(typename) if err != nil { panic(err) diff --git a/statefun-sdk-go/v3/pkg/statefun/typename_test.go b/statefun-sdk-go/v3/pkg/statefun/typename_test.go index c07cd1977..5ef6bd2bf 100644 --- a/statefun-sdk-go/v3/pkg/statefun/typename_test.go +++ b/statefun-sdk-go/v3/pkg/statefun/typename_test.go @@ -17,8 +17,9 @@ package statefun import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestTypeNameParse(t *testing.T) { diff --git a/statefun-sdk-go/v3/pkg/statefun/types.go b/statefun-sdk-go/v3/pkg/statefun/types.go index e820a3a30..a54827420 100644 --- a/statefun-sdk-go/v3/pkg/statefun/types.go +++ b/statefun-sdk-go/v3/pkg/statefun/types.go @@ -18,10 +18,11 @@ package statefun import ( "encoding/json" "errors" - "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" - "google.golang.org/protobuf/proto" "io" "log" + + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" + "google.golang.org/protobuf/proto" ) // SimpleType interface is the core abstraction used by Stateful @@ -31,7 +32,7 @@ import ( // 1. TypeName to identify the type. // 2. (De)serialization methods for marshalling and unmarshalling data // -// Cross-language primitive types +// # Cross-language primitive types // // StateFun's type system has cross-language support for common primitive // types, such as boolean, integer (int32), long (int64), etc. These @@ -45,7 +46,7 @@ import ( // state values as well; so you can expect that a function can safely // read previous state after reimplementing it in a different language. // -// Common custom types +// # Common custom types // // The type system is also very easily extensible to support more complex types. // The Go SDK ships with predefined support for JSON and Protobuf - see MakeJsonType @@ -94,7 +95,6 @@ func (p PrimitiveType) GetTypeName() TypeName { case StringType: return stringTypeName default: - log.Fatalf("unknown primitive type %v", p) // unreachable return nil } diff --git a/statefun-sdk-go/v3/pkg/statefun/types_test.go b/statefun-sdk-go/v3/pkg/statefun/types_test.go index 4e00da662..71fbf14af 100644 --- a/statefun-sdk-go/v3/pkg/statefun/types_test.go +++ b/statefun-sdk-go/v3/pkg/statefun/types_test.go @@ -18,9 +18,10 @@ package statefun import ( "bytes" - "github.com/stretchr/testify/assert" "math" "testing" + + "github.com/stretchr/testify/assert" ) func TestBoolType(t *testing.T) { @@ -141,7 +142,7 @@ type User struct { func TestJsonType(t *testing.T) { buffer := bytes.Buffer{} - userType := MakeJsonType(TypeNameFrom("org.foo.bar/UserJson")) + userType := MakeJsonType(MustParseTypeName("org.foo.bar/UserJson")) err := userType.Serialize(&buffer, User{"bob", "mop"}) assert.NoError(t, err) diff --git a/statefun-sdk-go/v3/pkg/statefun/value_spec.go b/statefun-sdk-go/v3/pkg/statefun/value_spec.go index 6d51cd8f6..5f058812f 100644 --- a/statefun-sdk-go/v3/pkg/statefun/value_spec.go +++ b/statefun-sdk-go/v3/pkg/statefun/value_spec.go @@ -18,7 +18,6 @@ package statefun import ( "fmt" - "log" "regexp" "time" ) @@ -31,16 +30,22 @@ const ( expireAfterWrite ) +const ( + _none = "none" + _expireAfterCall = "expire_after_call" + _expireAfterWrite = "expire_after_write" +) + func (e expirationType) String() string { switch e { case expireAfterCall: - return "expire_after_call" + return _expireAfterCall case expireAfterWrite: - return "expire_after_write" + return _expireAfterWrite case none: - return "none" + fallthrough default: - panic("unknown Expiration type") + return _none } } @@ -113,7 +118,7 @@ invalid state tpe %s. state names can only start with alphabet letters [a-z][A-Z func validateValueSpec(s ValueSpec) error { matched, err := regexp.MatchString("^[a-zA-Z_][a-zA-Z_\\d]*$", s.Name) if err != nil { - log.Panicf("invalid regex; this is a bug: %v", err) + return fmt.Errorf("invalid regex; this is a bug: %v", err) } if !matched {