diff --git a/chart/mqtt-connector/templates/deployment.yml b/chart/mqtt-connector/templates/deployment.yml index 2209124..817c3b7 100644 --- a/chart/mqtt-connector/templates/deployment.yml +++ b/chart/mqtt-connector/templates/deployment.yml @@ -60,6 +60,9 @@ spec: {{- if .Values.trimChannelKey }} - "-trim-channel-key={{.Values.trimChannelKey}}" {{- end }} + {{- if .Values.asyncInvoke }} + - "-async-invoke={{.Values.asyncInvoke}}" + {{- end }} env: - name: gateway_url value: {{ .Values.gateway_url | quote }} diff --git a/chart/mqtt-connector/values.yaml b/chart/mqtt-connector/values.yaml index 2a14792..3f54939 100644 --- a/chart/mqtt-connector/values.yaml +++ b/chart/mqtt-connector/values.yaml @@ -1,4 +1,4 @@ -image: alexellis2/mqtt-connector:0.2.0 +image: alexellis2/mqtt-connector:0.2.1 replicas: 1 # Emitter.io example @@ -14,11 +14,13 @@ broker: tcp://emitter:8080 clientID: testgoid authPassword: "" -upstream_timeout: 15s +upstream_timeout: 15s # Maximum duration for an invocation rebuild_interval: 10s +asyncInvoke: false # Invoke via NATS using the function's asynchronous route basic_auth: true gateway_url: http://gateway.openfaas:8080 nodeSelector: {} tolerations: [] affinity: {} + diff --git a/main.go b/main.go index 5f232bb..0216f02 100644 --- a/main.go +++ b/main.go @@ -17,14 +17,19 @@ import ( ) func main() { - - var gatewayUsername, gatewayPassword, gatewayFlag string - var trimChannelKey bool + var ( + gatewayUsername string + gatewayPassword string + gatewayFlag string + trimChannelKey bool + asyncInvoke bool + ) flag.StringVar(&gatewayUsername, "gw-username", "", "Username for the gateway") flag.StringVar(&gatewayPassword, "gw-password", "", "Password for gateway") flag.StringVar(&gatewayFlag, "gateway", "", "gateway") flag.BoolVar(&trimChannelKey, "trim-channel-key", false, "Trim channel key when using emitter.io MQTT broker") + flag.BoolVar(&asyncInvoke, "async-invoke", false, "Invoke via queueing using NATS and the function's async endpoint") topic := flag.String("topic", "", "The topic name to/from which to publish/subscribe") broker := flag.String("broker", "tcp://iot.eclipse.org:1883", "The broker URI. ex: tcp://10.10.1.1:1883") @@ -58,13 +63,15 @@ func main() { } config := &types.ControllerConfig{ - RebuildInterval: time.Millisecond * 1000, - GatewayURL: gatewayURL, - PrintResponse: true, - PrintResponseBody: true, + RebuildInterval: time.Millisecond * 1000, + GatewayURL: gatewayURL, + PrintResponse: true, + PrintResponseBody: true, + TopicAnnotationDelimiter: ",", + AsyncFunctionInvocation: asyncInvoke, } - log.Printf("Topic: %s\tBroker: %s\n", *topic, *broker) + log.Printf("Topic: %s\tBroker: %s\tAsync: %v\n", *topic, *broker, asyncInvoke) controller := types.NewController(creds, config)