Skip to content

Commit

Permalink
Implemented Fail and Succeed
Browse files Browse the repository at this point in the history
Added test for them in fc_test.go
Added test for parse+choice+fail in aslparser_test.go
  • Loading branch information
redjack96 committed Sep 2, 2024
1 parent 88c5e7b commit 578d00f
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 34 deletions.
1 change: 0 additions & 1 deletion internal/fc/choice_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (c *ChoiceNode) Exec(compRequest *CompositionRequest, params ...map[string]
}

compRequest.ExecReport.Reports.Set(CreateExecutionReportId(c), execReport)
// compRequest.ExecReport.Reports[CreateExecutionReportId(c)] = execReport
return output, err
}

Expand Down
93 changes: 67 additions & 26 deletions internal/fc/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,61 @@ func (dag *Dag) executeFanIn(progress *Progress, fanIn *FanInNode, r *Compositio
return true, nil
}

func (dag *Dag) executeSucceedNode(progress *Progress, succeedNode *SucceedNode, r *CompositionRequest) (bool, error) {
return commonExec(dag, progress, succeedNode, r)
}

func (dag *Dag) executeFailNode(progress *Progress, failNode *FailNode, r *CompositionRequest) (bool, error) {
return commonExec(dag, progress, failNode, r)
}

func commonExec(dag *Dag, progress *Progress, node DagNode, r *CompositionRequest) (bool, error) {
var pd *PartialData
nodeId := node.GetId()
requestId := ReqId(r.ReqId)
partialData, err := RetrieveSinglePartialData(requestId, nodeId, cache.Persist)

if err != nil {
return false, fmt.Errorf("request %s - %s %s - %v", r.ReqId, node.GetNodeType(), node.GetId(), err)
}
err = node.CheckInput(partialData.Data)
if err != nil {
return false, err
}
// executing node
output, err := node.Exec(r, partialData.Data)
if err != nil {
return false, err
}

// Todo: uncomment when running TestInvokeFC_Concurrent to debug concurrency errors
// errDbg := Debug(r, string(node.Id), output)
// if errDbg != nil {
// return false, errDbg
// }

forNode := node.GetNext()[0]
pd = NewPartialData(requestId, forNode, nodeId, output)
errSend := node.PrepareOutput(dag, output)
if errSend != nil {
return false, fmt.Errorf("the node %s cannot send the output: %v", node.String(), errSend)
}

// saving partial data and updating progress
err = SavePartialData(pd, cache.Persist)
if err != nil {
return false, err
}
err = progress.CompleteNode(nodeId)
if err != nil {
return false, err
}
if node.GetNodeType() == Fail || node.GetNodeType() == Succeed {
return false, nil
}
return true, nil
}

func (dag *Dag) executeEnd(progress *Progress, node *EndNode, r *CompositionRequest) (bool, error) {
// r.ExecReport.Reports[CreateExecutionReportId(node)] = &function.ExecutionReport{Result: "end"}
r.ExecReport.Reports.Set(CreateExecutionReportId(node), &function.ExecutionReport{Result: "end"})
Expand All @@ -573,7 +628,7 @@ func (dag *Dag) Execute(r *CompositionRequest) (bool, error) {
if err != nil {
return true, err
}
} else {
} else if len(nextNodes) == 1 {
n, ok := dag.Find(nextNodes[0])
if !ok {
return true, fmt.Errorf("failed to find node %s", n.GetId())
Expand All @@ -590,6 +645,10 @@ func (dag *Dag) Execute(r *CompositionRequest) (bool, error) {
shouldContinue, err = dag.executeStart(progress, node, r)
case *FanOutNode:
shouldContinue, err = dag.executeFanOut(progress, node, r)
case *FailNode:
shouldContinue, err = dag.executeFailNode(progress, node, r)
case *SucceedNode:
shouldContinue, err = dag.executeSucceedNode(progress, node, r)
case *EndNode:
shouldContinue, err = dag.executeEnd(progress, node, r)
}
Expand All @@ -598,6 +657,8 @@ func (dag *Dag) Execute(r *CompositionRequest) (bool, error) {
r.ExecReport.Progress = progress
return true, err
}
} else {
return false, fmt.Errorf("there aren't next nodes")
}

err = SaveProgress(progress, cache.Persist)
Expand Down Expand Up @@ -745,6 +806,9 @@ func (dag *Dag) decodeNode(nodeId string, value json.RawMessage) error {
return fmt.Errorf("unknown nodeType: %v", tempNodeMap["NodeType"])
}
var err error

node := DagNodeFromType(DagNodeType(dagNodeType))

switch DagNodeType(dagNodeType) {
case Start:
node := &StartNode{}
Expand All @@ -753,13 +817,6 @@ func (dag *Dag) decodeNode(nodeId string, value json.RawMessage) error {
dag.Nodes[DagNodeId(nodeId)] = node
return nil
}
case End:
node := &EndNode{}
err = json.Unmarshal(value, node)
if err == nil && node.Id != "" {
dag.Nodes[DagNodeId(nodeId)] = node
return nil
}
case Simple:
node := &SimpleNode{}
err = json.Unmarshal(value, node)
Expand All @@ -774,28 +831,12 @@ func (dag *Dag) decodeNode(nodeId string, value json.RawMessage) error {
dag.Nodes[DagNodeId(nodeId)] = node
return nil
}
case FanOut:
node := &FanOutNode{}
err = json.Unmarshal(value, node)
if err == nil && node.Id != "" {
dag.Nodes[DagNodeId(nodeId)] = node
return nil
}
case FanIn:
node := &FanInNode{}
default:
err = json.Unmarshal(value, node)
if err == nil && node.Id != "" {
if err == nil && node.GetId() != "" {
dag.Nodes[DagNodeId(nodeId)] = node
return nil
}
case Fail:
return fmt.Errorf("not implemented") // TODO: implement me!
case Succeed:
return fmt.Errorf("not implemented") // TODO: implement me!
case Pass:
return fmt.Errorf("not implemented") // TODO: implement me!
case Wait:
return fmt.Errorf("not implemented") // TODO: implement me!
}
var unmarshalTypeError *json.UnmarshalTypeError
if err != nil && !errors.As(err, &unmarshalTypeError) {
Expand Down
38 changes: 35 additions & 3 deletions internal/fc/fail_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package fc

import (
"fmt"
"github.com/grussorusso/serverledge/internal/function"
"github.com/grussorusso/serverledge/internal/types"
"github.com/lithammer/shortuuid"
"time"
)

type FailNode struct {
Expand All @@ -19,6 +21,11 @@ type FailNode struct {
BranchId int
}

func (f *FailNode) PrepareOutput(dag *Dag, output map[string]interface{}) error {
output[f.Error] = f.Cause
return nil
}

func NewFailNode(error, cause string) *FailNode {
if len(error) > 20 {
fmt.Printf("error string identifier should be less than 20 characters but is %d characters long\n", len(error))
Expand All @@ -33,16 +40,41 @@ func NewFailNode(error, cause string) *FailNode {
}

func (f *FailNode) Exec(compRequest *CompositionRequest, params ...map[string]interface{}) (map[string]interface{}, error) {
//TODO implement me
panic("implement me")
t0 := time.Now()
output := make(map[string]interface{})
var err error = nil
if len(params) != 1 {
return nil, fmt.Errorf("failed to get one input for fail node: received %d inputs", len(params))
}
respAndDuration := time.Now().Sub(t0).Seconds()
execReport := &function.ExecutionReport{
Result: fmt.Sprintf("%v", output),
ResponseTime: respAndDuration,
IsWarmStart: true, // not in a container
InitTime: 0,
OffloadLatency: 0,
Duration: respAndDuration,
SchedAction: "",
}
compRequest.ExecReport.Reports.Set(CreateExecutionReportId(f), execReport)
return output, err
}

func (f *FailNode) Equals(cmp types.Comparable) bool {
f2, ok := cmp.(*FailNode)
if !ok {
return false
}
return f.Id == f2.Id && f.NodeType == f2.NodeType && f.Error == f2.Error && f.Cause == f2.Cause
return f.Id == f2.Id &&
f.NodeType == f2.NodeType &&
f.Error == f2.Error &&
f.Cause == f2.Cause &&
f.OutputTo == f2.OutputTo &&
f.BranchId == f2.BranchId
}

func (f *FailNode) CheckInput(input map[string]interface{}) error {
return nil
}

func (f *FailNode) AddOutput(dag *Dag, dagNode DagNodeId) error {
Expand Down
27 changes: 27 additions & 0 deletions internal/fc/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ const (
Wait DagNodeType = "WaitNode"
)

func DagNodeFromType(nodeType DagNodeType) DagNode {
switch nodeType {
case Start:
return &StartNode{}
case End:
return &EndNode{}
case Simple:
return &SimpleNode{}
case Choice:
return &ChoiceNode{}
case FanOut:
return &FanOutNode{}
case FanIn:
return &FanInNode{}
case Fail:
return &FailNode{}
case Succeed:
return &SucceedNode{}
case Pass:
return &PassNode{}
case Wait:
return &WaitNode{}
default:
return &SimpleNode{}
}
}

func parseType(dNode DagNode) DagNodeType {
switch dNode.(type) {
case *StartNode:
Expand Down
37 changes: 33 additions & 4 deletions internal/fc/succeed_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package fc

import (
"fmt"
"github.com/grussorusso/serverledge/internal/function"
"github.com/grussorusso/serverledge/internal/types"
"github.com/lithammer/shortuuid"
"time"
)

type SucceedNode struct {
Expand All @@ -13,7 +15,7 @@ type SucceedNode struct {
OutputPath string

/* (Serverledge specific) */

Message string
// OutputTo for a SucceedNode is used to send the output to the EndNode
OutputTo DagNodeId
BranchId int
Expand All @@ -23,21 +25,48 @@ func NewSucceedNode(message string) *SucceedNode {
succeedNode := SucceedNode{
Id: DagNodeId("succeed_" + shortuuid.New()),
NodeType: Succeed,
Message: message,
}
return &succeedNode
}

func (s *SucceedNode) Exec(compRequest *CompositionRequest, params ...map[string]interface{}) (map[string]interface{}, error) {
//TODO implement me
panic("implement me")
t0 := time.Now()
var err error = nil
if len(params) != 1 {
return nil, fmt.Errorf("failed to get one input for succeed node: received %d inputs", len(params))
}
output := params[0]
respAndDuration := time.Now().Sub(t0).Seconds()
execReport := &function.ExecutionReport{
Result: fmt.Sprintf("%v", output),
ResponseTime: respAndDuration,
IsWarmStart: true, // not in a container
InitTime: 0,
OffloadLatency: 0,
Duration: respAndDuration,
SchedAction: "",
}
compRequest.ExecReport.Reports.Set(CreateExecutionReportId(s), execReport)
return output, err
}

func (s *SucceedNode) Equals(cmp types.Comparable) bool {
s2, ok := cmp.(*SucceedNode)
if !ok {
return false
}
return s.Id == s2.Id && s.NodeType == s2.NodeType && s.InputPath == s2.InputPath && s.OutputPath == s2.OutputPath
return s.Id == s2.Id &&
s.NodeType == s2.NodeType &&
s.InputPath == s2.InputPath &&
s.OutputPath == s2.OutputPath &&
s.OutputTo == s2.OutputTo &&
s.BranchId == s2.BranchId &&
s.Message == s2.Message
}

func (s *SucceedNode) CheckInput(input map[string]interface{}) error {
return nil
}

func (s *SucceedNode) AddOutput(dag *Dag, dagNode DagNodeId) error {
Expand Down
48 changes: 48 additions & 0 deletions internal/test/fc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,3 +669,51 @@ func TestInvokeCompositionError(t *testing.T) {

request.ExecReport.Progress.Print()
}

func TestInvokeCompositionFailAndSucceed(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test")
}

dag, errDag := fc.NewDagBuilder().
AddChoiceNode(
fc.NewEqParamCondition(fc.NewParam("value"), fc.NewValue(1)),
fc.NewConstCondition(true),
).
NextBranch(fc.NewDagBuilder().AddSucceedNodeAndBuild("everything ok")).
NextBranch(fc.NewDagBuilder().AddFailNodeAndBuild("FakeError", "This should be an error")).
EndChoiceAndBuild()
u.AssertNil(t, errDag)
fcomp := fc.NewFC("fail_succeed", *dag, []*function.Function{}, true)
err1 := fcomp.SaveToEtcd()
u.AssertNil(t, err1)

// First run: Success

// INVOKE - we call the function composition
params := make(map[string]interface{})
params["value"] = 1

request := fc.NewCompositionRequest(shortuuid.New(), &fcomp, params)
resultMap, errInvoke1 := fcomp.Invoke(request)
u.AssertNilMsg(t, errInvoke1, "error while invoking the branch (succeed)")

result, err := resultMap.GetIntSingleResult()
u.AssertNilMsg(t, err, "Result not found")
u.AssertEquals(t, 1, result)

// Second run: Fail
params2 := make(map[string]interface{})
params2["value"] = 2

request2 := fc.NewCompositionRequest(shortuuid.New(), &fcomp, params2)
resultMap2, errInvoke2 := fcomp.Invoke(request2)
u.AssertNilMsg(t, errInvoke2, "error while invoking the branch (fail)")

valueError, found := resultMap2.Result["FakeError"]
u.AssertTrueMsg(t, found, "FakeError not found")
causeStr, ok := valueError.(string)

u.AssertTrueMsg(t, ok, "cause value is not a string")
u.AssertEquals(t, "This should be an error", causeStr)
}

0 comments on commit 578d00f

Please sign in to comment.