Skip to content

Commit

Permalink
Merge pull request #20 from n3wscott/structured-mode
Browse files Browse the repository at this point in the history
Structured mode
  • Loading branch information
Scott Nichols authored Apr 2, 2021
2 parents 0f65088 + 0a0943b commit eddc279
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ func AddConformanceCommands(topLevel *cobra.Command) {
addSend(topLevel)
addInvoke(topLevel)
addListener(topLevel)
addRaw(topLevel)
}
5 changes: 1 addition & 4 deletions pkg/commands/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func addInvoke(topLevel *cobra.Command) {
}

// Run it.
if err := i.Do(); err != nil {
return err
}
return nil
return i.Do()
},
}
options.AddFilenameArg(invoke, fo)
Expand Down
3 changes: 3 additions & 0 deletions pkg/commands/options/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func wrap(text string, width int) string {
}

const (
httpSpecMode = `Mode of the outbound event. In the binary content mode, the value of the event data is placed into the HTTP request. In the structured content mode, event metadata attributes and event data are placed into the HTTP request body as JSON. [binary, structured]`
// Required
specTextID = "Identifies the event. Producers MUST ensure that source + id is unique for each distinct event. If a duplicate event is re-sent (e.g. due to a network error) it MAY have the same id. Consumers MAY assume that Events with identical source and id are duplicates."
specTextSource = "Identifies the context in which an event happened. Often this will include information such as the type of the event source, the organization publishing the event or the process that produced the event. The exact syntax and semantics behind the data encoded in the URI is defined by the event producer."
Expand All @@ -53,6 +54,8 @@ const (
)

func AddEventArgs(cmd *cobra.Command, eo *EventOptions) {
// Content
cmd.Flags().StringVar(&eo.Event.Mode, "mode", "", wrap80(httpSpecMode))

// Required fields.

Expand Down
30 changes: 30 additions & 0 deletions pkg/commands/raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package commands

import (
"github.com/spf13/cobra"

"github.com/cloudevents/conformance/pkg/commands/options"
"github.com/cloudevents/conformance/pkg/http"
)

func addRaw(topLevel *cobra.Command) {
po := &options.PortOptions{}
raw := &cobra.Command{
Use: "raw",
Short: "Dump the raw HTTP request to stdout. (For debugging HTTP requests.)",
Example: `
cloudevents raw -P 8181
`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
r := http.Raw{
Out: cmd.OutOrStdout(),
Port: po.Port,
}
return r.Do()
},
}
options.AddPortArg(raw, po)

topLevel.AddCommand(raw)
}
19 changes: 14 additions & 5 deletions pkg/commands/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package commands

import (
"errors"
"log"
"net/url"
"strings"
"time"
Expand All @@ -16,13 +15,15 @@ import (
func addSend(topLevel *cobra.Command) {
ho := &options.HostOptions{}
eo := &options.EventOptions{}
do := &options.DeliveryOptions{}
yo := &options.YAMLOptions{}
vo := &options.VerboseOptions{}
invoke := &cobra.Command{
Use: "send",
Short: "Send a cloudevent.",
Example: `
cloudevents send http://localhost:8080/ --id abc-123 --source cloudevents.conformance.tool --type foo.bar
cloudevents send http://localhost:8080/ --id 321-cba --source cloudevents.conformance.tool --type foo.json --mode structured
`,
Args: func(cmd *cobra.Command, args []string) error {
if len(args) < 1 {
Expand All @@ -35,7 +36,7 @@ func addSend(topLevel *cobra.Command) {
ho.URL = u
return nil
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
// Process time now.
if eo.Now {
eo.Event.Attributes.Time = time.Now().UTC().Format(time.RFC3339Nano)
Expand All @@ -60,15 +61,23 @@ func addSend(topLevel *cobra.Command) {
Verbose: vo.Verbose,
}

// Run it.
if err := i.Do(); err != nil {
log.Fatalf("error sending: %v", err)
// Add delay, if specified.
if len(do.Delay) > 0 {
d, err := time.ParseDuration(do.Delay)
if err != nil {
return err
}
i.Delay = &d
}

// Run it.
return i.Do()
},
}
options.AddEventArgs(invoke, eo)
options.AddYAMLArg(invoke, yo)
options.AddVerboseArg(invoke, vo)
options.AddDeliveryArg(invoke, do)

topLevel.AddCommand(invoke)
}
8 changes: 8 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ package event

type MutationFn func(Event) (Event, error)

// Mode of encoding.
const (
DefaultMode = ""
BinaryMode = "binary"
StructuredMode = "structured"
)

type Event struct {
Mode string `yaml:"Mode,omitempty"`
Attributes ContextAttributes `yaml:"ContextAttributes"`
TransportExtensions Extensions `yaml:"TransportExtensions,omitempty"`
Data string `yaml:"Data"`
Expand Down
182 changes: 166 additions & 16 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -19,8 +20,79 @@ func addHeader(req *http.Request, key, value string) {
}
}

func EventToRequest(url string, event event.Event) (*http.Request, error) {
func addStructured(env map[string]interface{}, key, value string) {
value = strings.TrimSpace(value)
if value != "" {
env[key] = value
}
}

func EventToRequest(url string, in event.Event) (*http.Request, error) {
switch in.Mode {
case event.StructuredMode:
return structuredEventToRequest(url, in)
case event.DefaultMode, event.BinaryMode:
return binaryEventToRequest(url, in)
}
return nil, fmt.Errorf("unknown content mode: %q", in.Mode)
}

func structuredEventToRequest(url string, event event.Event) (*http.Request, error) {
env := make(map[string]interface{})

// CloudEvents attributes.
addStructured(env, "specversion", event.Attributes.SpecVersion)
addStructured(env, "type", event.Attributes.Type)
addStructured(env, "time", event.Attributes.Time)
addStructured(env, "id", event.Attributes.ID)
addStructured(env, "source", event.Attributes.Source)
addStructured(env, "subject", event.Attributes.Subject)
addStructured(env, "schemaurl", event.Attributes.SchemaURL)
addStructured(env, "datacontenttype", event.Attributes.DataContentType)
addStructured(env, "datacontentencoding", event.Attributes.DataContentEncoding)

// CloudEvents attribute extensions.
for k, v := range event.Attributes.Extensions {
addStructured(env, k, v)
}

// TODO: based on datacontenttype, we should parse data and then set the result in the envelope.
if len(event.Data) > 0 {
data := json.RawMessage{}
if err := json.Unmarshal([]byte(event.Data), &data); err != nil {
return nil, err
}
env["data"] = data
}

// To JSON.
body, err := json.Marshal(env)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}

// Transport extensions.
hasContentType := false
for k, v := range event.TransportExtensions {
if strings.EqualFold(v, "Content-Type") {
hasContentType = true
}
addHeader(req, k, v)
}

if !hasContentType {
addHeader(req, "Content-Type", "application/cloudevents+json; charset=UTF-8")
}

return req, nil
}

func binaryEventToRequest(url string, event event.Event) (*http.Request, error) {
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(event.Data)))
if err != nil {
return nil, err
Expand Down Expand Up @@ -51,53 +123,131 @@ func EventToRequest(url string, event event.Event) (*http.Request, error) {
}

func RequestToEvent(req *http.Request) (*event.Event, error) {
if strings.HasPrefix(req.Header.Get("Content-Type"), "application/cloudevents+json") {
req.Header.Del("Content-Type")
return structuredRequestToEvent(req)
}
return binaryRequestToEvent(req)
}

func structuredRequestToEvent(req *http.Request) (*event.Event, error) {
out := &event.Event{
Mode: event.StructuredMode,
}

body, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
_ = body

event := &event.Event{
env := make(map[string]json.RawMessage)
if err := json.Unmarshal(body, &env); err != nil {
return nil, err
}

insert := func(key string, into *string) {
if _, found := env[key]; found {
if err := json.Unmarshal(env[key], into); err != nil {
*into = err.Error()
}
delete(env, key)
}
}

// CloudEvents attributes.
insert("specversion", &out.Attributes.SpecVersion)
insert("type", &out.Attributes.Type)
insert("time", &out.Attributes.Time)
insert("id", &out.Attributes.ID)
insert("source", &out.Attributes.Source)
insert("subject", &out.Attributes.Subject)
insert("schemaurl", &out.Attributes.SchemaURL)
insert("datacontenttype", &out.Attributes.DataContentType)
insert("datacontentencoding", &out.Attributes.DataContentEncoding)

// CloudEvents Data.
if _, found := env["data"]; found {
out.Data = string(env["data"]) + "\n"
delete(env, "data")
}

// CloudEvents attribute extensions.
out.Attributes.Extensions = make(map[string]string)
for key, b := range env {
var into string
if err := json.Unmarshal(b, &into); err != nil {
into = err.Error()
}
out.Attributes.Extensions[key] = into
delete(env, key)
}

// Transport extensions.
out.TransportExtensions = make(map[string]string)
for k := range req.Header {
if k == "Accept-Encoding" || k == "Content-Length" {
continue
}
out.TransportExtensions[k] = req.Header.Get(k)
req.Header.Del(k)
}

return out, nil
}

func binaryRequestToEvent(req *http.Request) (*event.Event, error) {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
_ = body

out := &event.Event{
Mode: event.BinaryMode,
Data: string(body),
}

// CloudEvents attributes.
event.Attributes.SpecVersion = req.Header.Get("ce-specversion")
out.Attributes.SpecVersion = req.Header.Get("ce-specversion")
req.Header.Del("ce-specversion")
event.Attributes.Type = req.Header.Get("ce-type")
out.Attributes.Type = req.Header.Get("ce-type")
req.Header.Del("ce-type")
event.Attributes.Time = req.Header.Get("ce-time")
out.Attributes.Time = req.Header.Get("ce-time")
req.Header.Del("ce-time")
event.Attributes.ID = req.Header.Get("ce-id")
out.Attributes.ID = req.Header.Get("ce-id")
req.Header.Del("ce-id")
event.Attributes.Source = req.Header.Get("ce-source")
out.Attributes.Source = req.Header.Get("ce-source")
req.Header.Del("ce-source")
event.Attributes.Subject = req.Header.Get("ce-subject")
out.Attributes.Subject = req.Header.Get("ce-subject")
req.Header.Del("ce-subject")
event.Attributes.SchemaURL = req.Header.Get("ce-schemaurl")
out.Attributes.SchemaURL = req.Header.Get("ce-schemaurl")
req.Header.Del("ce-schemaurl")
event.Attributes.DataContentType = req.Header.Get("Content-Type")
out.Attributes.DataContentType = req.Header.Get("Content-Type")
req.Header.Del("Content-Type")
event.Attributes.DataContentEncoding = req.Header.Get("ce-datacontentencoding")
out.Attributes.DataContentEncoding = req.Header.Get("ce-datacontentencoding")
req.Header.Del("ce-datacontentencoding")

// CloudEvents attribute extensions.
event.Attributes.Extensions = make(map[string]string)
out.Attributes.Extensions = make(map[string]string)
for k := range req.Header {
if strings.HasPrefix(strings.ToLower(k), "ce-") {
event.Attributes.Extensions[k[len("ce-"):]] = req.Header.Get(k)
out.Attributes.Extensions[k[len("ce-"):]] = req.Header.Get(k)
req.Header.Del(k)
}
}

// Transport extensions.
event.TransportExtensions = make(map[string]string)
out.TransportExtensions = make(map[string]string)
for k := range req.Header {
event.TransportExtensions[k] = req.Header.Get(k)
if k == "Accept-Encoding" || k == "Content-Length" {
continue
}
out.TransportExtensions[k] = req.Header.Get(k)
req.Header.Del(k)
}

return event, nil
return out, nil
}

func Do(req *http.Request, hook ResultsFn) error {
Expand Down
Loading

0 comments on commit eddc279

Please sign in to comment.