Skip to content

Commit

Permalink
Update to add async option
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Dec 17, 2019
1 parent b49818e commit c9a080d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
3 changes: 3 additions & 0 deletions chart/mqtt-connector/templates/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
{{- if .Values.trimChannelKey }}
- "-trim-channel-key={{.Values.trimChannelKey}}"
{{- end }}
{{- if .Values.asyncInvoke }}
- "-async-invoke={{.Values.asyncInvoke}}"
{{- end }}
env:
- name: gateway_url
value: {{ .Values.gateway_url | quote }}
Expand Down
6 changes: 4 additions & 2 deletions chart/mqtt-connector/values.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
image: alexellis2/mqtt-connector:0.2.0
image: alexellis2/mqtt-connector:0.2.1
replicas: 1

# Emitter.io example
Expand All @@ -14,11 +14,13 @@ broker: tcp://emitter:8080
clientID: testgoid
authPassword: ""

upstream_timeout: 15s
upstream_timeout: 15s # Maximum duration for an invocation
rebuild_interval: 10s
asyncInvoke: false # Invoke via NATS using the function's asynchronous route
basic_auth: true
gateway_url: http://gateway.openfaas:8080

nodeSelector: {}
tolerations: []
affinity: {}

23 changes: 15 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ import (
)

func main() {

var gatewayUsername, gatewayPassword, gatewayFlag string
var trimChannelKey bool
var (
gatewayUsername string
gatewayPassword string
gatewayFlag string
trimChannelKey bool
asyncInvoke bool
)

flag.StringVar(&gatewayUsername, "gw-username", "", "Username for the gateway")
flag.StringVar(&gatewayPassword, "gw-password", "", "Password for gateway")
flag.StringVar(&gatewayFlag, "gateway", "", "gateway")
flag.BoolVar(&trimChannelKey, "trim-channel-key", false, "Trim channel key when using emitter.io MQTT broker")
flag.BoolVar(&asyncInvoke, "async-invoke", false, "Invoke via queueing using NATS and the function's async endpoint")

topic := flag.String("topic", "", "The topic name to/from which to publish/subscribe")
broker := flag.String("broker", "tcp://iot.eclipse.org:1883", "The broker URI. ex: tcp://10.10.1.1:1883")
Expand Down Expand Up @@ -58,13 +63,15 @@ func main() {
}

config := &types.ControllerConfig{
RebuildInterval: time.Millisecond * 1000,
GatewayURL: gatewayURL,
PrintResponse: true,
PrintResponseBody: true,
RebuildInterval: time.Millisecond * 1000,
GatewayURL: gatewayURL,
PrintResponse: true,
PrintResponseBody: true,
TopicAnnotationDelimiter: ",",
AsyncFunctionInvocation: asyncInvoke,
}

log.Printf("Topic: %s\tBroker: %s\n", *topic, *broker)
log.Printf("Topic: %s\tBroker: %s\tAsync: %v\n", *topic, *broker, asyncInvoke)

controller := types.NewController(creds, config)

Expand Down

0 comments on commit c9a080d

Please sign in to comment.