Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement docker executor and unit test #277

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
148 changes: 102 additions & 46 deletions yoda/executor/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,128 @@ package executor
import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
"encoding/base64"
"fmt"
"net/url"
"os/exec"
"path/filepath"
"strconv"
"time"

"github.com/google/shlex"

"github.com/bandprotocol/chain/v2/x/oracle/types"
"github.com/levigross/grequests"
)

// Only use in testnet. No intensive testing, use at your own risk
type DockerExec struct {
image string
name string
timeout time.Duration
// portLists chan string
port string
maxTry int
}

func NewDockerExec(image string, timeout time.Duration) *DockerExec {
return &DockerExec{image: image, timeout: timeout}
func NewDockerExec(image string, timeout time.Duration, maxTry int, startPort int, endPort int) *DockerExec {
// portLists := make(chan string, endPort-startPort+1)
name := "docker-runtime-executor-"
for i := startPort; i <= endPort; i++ {
port := strconv.Itoa(i)
StartContainer(name, port, image)
// portLists <- port
}

return &DockerExec{image: image, name: name, timeout: timeout, port: strconv.Itoa(startPort), maxTry: maxTry}
}

func (e *DockerExec) Exec(code []byte, arg string, env interface{}) (ExecResult, error) {
// TODO: Handle env if we are to revive Docker
dir, err := ioutil.TempDir("/tmp", "executor")
func StartContainer(name string, port string, image string) error {
err := exec.Command("docker", "restart", name+port).Run()
if err != nil {
return ExecResult{}, err
dockerArgs := append([]string{
"run",
"--name", name + port,
"-p", port + ":5000",
"--restart=always",
"--memory=512m",
image,
})

cmd := exec.CommandContext(context.Background(), "docker", dockerArgs...)
var buf bytes.Buffer
cmd.Stdout = &buf
cmd.Stderr = &buf
err = cmd.Start()
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(filepath.Join(dir, "exec"), code, 0777)
return err
}

func (e *DockerExec) PostRequest(
colmazia marked this conversation as resolved.
Show resolved Hide resolved
code []byte,
arg string,
env interface{},
name string,
port string,
) (ExecResult, error) {
executable := base64.StdEncoding.EncodeToString(code)
resp, err := grequests.Post(
"http://localhost:"+port,
&grequests.RequestOptions{
Headers: map[string]string{
"Content-Type": "application/json",
},
JSON: map[string]interface{}{
"executable": executable,
"calldata": arg,
"timeout": e.timeout.Milliseconds(),
"env": env,
},
RequestTimeout: e.timeout,
},
)

if err != nil {
return ExecResult{}, err
urlErr, ok := err.(*url.Error)
if !ok || !urlErr.Timeout() {
return ExecResult{}, err
}
// Return timeout code
return ExecResult{Output: []byte{}, Code: 111}, nil
}
name := filepath.Base(dir)
args, err := shlex.Split(arg)

if !resp.Ok {
return ExecResult{}, ErrRestNotOk
}

r := externalExecutionResponse{}
err = resp.JSON(&r)

if err != nil {
return ExecResult{}, err
}
dockerArgs := append([]string{
"run", "--rm",
"-v", dir + ":/scratch:ro",
"--name", name,
e.image,
"/scratch/exec",
}, args...)
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()
cmd := exec.CommandContext(ctx, "docker", dockerArgs...)
var buf bytes.Buffer
cmd.Stdout = &buf
cmd.Stderr = &buf
err = cmd.Run()
if ctx.Err() == context.DeadlineExceeded {
exec.Command("docker", "kill", name).Start()
return ExecResult{}, ErrExecutionimeout

// go func() {
// // StartContainer(name, port, e.image)
// err := exec.Command("docker", "restart", name+port).Run()
// for err != nil {
// err = StartContainer(name, port, e.image)
// }
// e.portLists <- port
// }()
if r.Returncode == 0 {
return ExecResult{Output: []byte(r.Stdout), Code: 0, Version: r.Version}, nil
} else {
return ExecResult{Output: []byte(r.Stderr), Code: r.Returncode, Version: r.Version}, nil
}
exitCode := uint32(0)
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = uint32(exitError.ExitCode())
} else {
return ExecResult{}, err
}

func (e *DockerExec) Exec(code []byte, arg string, env interface{}) (ExecResult, error) {
// port := <-e.portLists
errs := []error{}
for i := 0; i < e.maxTry; i++ {
execResult, err := e.PostRequest(code, arg, env, e.name, e.port)
if err == nil {
return execResult, err
}
errs = append(errs, err)
time.Sleep(500 * time.Millisecond)
}
output, err := ioutil.ReadAll(io.LimitReader(&buf, int64(types.DefaultMaxReportDataSize)))
if err != nil {
return ExecResult{}, err
}
return ExecResult{Output: output, Code: exitCode}, nil
return ExecResult{}, fmt.Errorf(ErrReachMaxTry.Error()+", tried errors: %#q", errs)
}
61 changes: 22 additions & 39 deletions yoda/executor/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,31 @@ package executor

import (
"testing"
)

func TestDockerSuccess(t *testing.T) {
// TODO: Enable test when CI has docker installed.
// e := NewDockerExec("bandprotocol/runtime:1.0.2", 10*time.Second)
// res, err := e.Exec([]byte(`#!/usr/bin/env python3
// import json
// import urllib.request
// import sys
"time"

// BINANCE_URL = "https://api.binance.com/api/v1/depth?symbol={}USDT&limit=5"

// def make_json_request(url):
// req = urllib.request.Request(url)
// req.add_header(
// "User-Agent",
// "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",
// )
// return json.loads(urllib.request.urlopen(req).read())

// def main(symbol):
// res = make_json_request(BINANCE_URL.format(symbol))
// bid = float(res["bids"][0][0])
// ask = float(res["asks"][0][0])
// return (bid + ask) / 2
"github.com/stretchr/testify/require"
)

// if __name__ == "__main__":
// try:
// print(main(*sys.argv[1:]))
// except Exception as e:
// print(str(e), file=sys.stderr)
// sys.exit(1)
// `), "BTC")
// fmt.Println(string(res.Output), res.Code, err)
// require.True(t, false)
func SetupDockerTest(t *testing.T) {
}

func TestDockerLongStdout(t *testing.T) {
func TestDockerSuccess(t *testing.T) {
// TODO: Enable test when CI has docker installed.
// e := NewDockerExec("bandprotocol/runtime:1.0.2", 10*time.Second)
// res, err := e.Exec([]byte(`#!/usr/bin/env python3
// print("A"*1000)`), "BTC")
// fmt.Println(string(res.Output), res.Code, err)
// require.True(t, false)
// Prerequisite: please build docker image before running test
e := NewDockerExec("ongartbandprotocol/docker-executor:0.2.0", 12*time.Second, 100, 30001, 30001)
for i := 0; i < 20; i++ {
res, err := e.Exec([]byte(
"#!/usr/bin/env python3\nimport os\nimport sys\nprint(sys.argv[1], os.getenv('BAND_CHAIN_ID'))",
), "TEST_ARG", map[string]interface{}{
"BAND_CHAIN_ID": "test-chain-id",
"BAND_VALIDATOR": "test-validator",
"BAND_REQUEST_ID": "test-request-id",
"BAND_EXTERNAL_ID": "test-external-id",
"BAND_REPORTER": "test-reporter",
"BAND_SIGNATURE": "test-signature",
})
require.Equal(t, []byte("TEST_ARG test-chain-id\n"), res.Output)
require.Equal(t, uint32(0), res.Code)
require.NoError(t, err)
}
}
71 changes: 57 additions & 14 deletions yoda/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"
)

const (
flagQueryTimeout = "timeout"
flagQueryTimeout = "timeout"
flagQueryMaxTry = "maxTry"
flagQueryPortRange = "portRange"
)

var (
ErrExecutionimeout = errors.New("execution timeout")
ErrRestNotOk = errors.New("rest return non 2XX response")
ErrReachMaxTry = errors.New("execution reach max try")
)

type ExecResult struct {
Expand All @@ -33,15 +37,22 @@ var testProgram []byte = []byte(

// NewExecutor returns executor by name and executor URL
func NewExecutor(executor string) (exec Executor, err error) {
name, base, timeout, err := parseExecutor(executor)
name, base, timeout, maxTry, startPort, endPort, err := parseExecutor(executor)
if err != nil {
return nil, err
}
switch name {
case "rest":
exec = NewRestExec(base, timeout)
case "docker":
return nil, fmt.Errorf("Docker executor is currently not supported")
// Only use in testnet. No intensive testing, use at your own risk
if endPort < startPort {
return nil, fmt.Errorf("portRange invalid: startPort: %d, endPort: %d", startPort, endPort)
}
if maxTry < 1 {
return nil, fmt.Errorf("maxTry invalid: %d", maxTry)
}
exec = NewDockerExec(base, timeout, maxTry, startPort, endPort)
default:
return nil, fmt.Errorf("Invalid executor name: %s, base: %s", name, base)
}
Expand All @@ -68,29 +79,61 @@ func NewExecutor(executor string) (exec Executor, err error) {
return exec, nil
}

// parseExecutor splits the executor string in the form of "name:base?timeout=" into parts.
func parseExecutor(executorStr string) (name string, base string, timeout time.Duration, err error) {
// parseExecutor splits the executor string in the form of "name:base?timeout=&maxTry=&portRange=" into parts.
func parseExecutor(
executorStr string,
) (name string, base string, timeout time.Duration, maxTry int, startPort int, endPort int, err error) {
executor := strings.SplitN(executorStr, ":", 2)
if len(executor) != 2 {
return "", "", 0, fmt.Errorf("Invalid executor, cannot parse executor: %s", executorStr)
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid executor, cannot parse executor: %s", executorStr)
}
u, err := url.Parse(executor[1])
if err != nil {
return "", "", 0, fmt.Errorf("Invalid url, cannot parse %s to url with error: %s", executor[1], err.Error())
return "", "", 0, 0, 0, 0, fmt.Errorf(
"Invalid url, cannot parse %s to url with error: %s",
executor[1],
err.Error(),
)
}

query := u.Query()
timeoutStr := query.Get(flagQueryTimeout)
if timeoutStr == "" {
return "", "", 0, fmt.Errorf("Invalid timeout, executor requires query timeout")
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid timeout, executor requires query timeout")
}
// Remove timeout from query because we need to return `base`
query.Del(flagQueryTimeout)
u.RawQuery = query.Encode()

timeout, err = time.ParseDuration(timeoutStr)
if err != nil {
return "", "", 0, fmt.Errorf("Invalid timeout, cannot parse duration with error: %s", err.Error())
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid timeout, cannot parse duration with error: %s", err.Error())
}

maxTryStr := query.Get(flagQueryMaxTry)
if maxTryStr == "" {
maxTryStr = "1"
}
maxTry, err = strconv.Atoi(maxTryStr)
if err != nil {
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid maxTry, cannot parse integer with error: %s", err.Error())
}

portRangeStr := query.Get(flagQueryPortRange)
ports := strings.SplitN(portRangeStr, "-", 2)
if len(ports) != 2 {
ports = []string{"0", "0"}
}
return executor[0], u.String(), timeout, nil
startPort, err = strconv.Atoi(ports[0])
if err != nil {
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid portRange, cannot parse integer with error: %s", err.Error())
}
endPort, err = strconv.Atoi(ports[1])
if err != nil {
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid portRange, cannot parse integer with error: %s", err.Error())
}

// Remove timeout from query because we need to return `base`
query.Del(flagQueryTimeout)
query.Del(flagQueryMaxTry)
query.Del(flagQueryPortRange)

u.RawQuery = query.Encode()
return executor[0], u.String(), timeout, maxTry, startPort, endPort, nil
}
Loading