Skip to content

Commit

Permalink
Merge branch 'mosn:main' into support-restapi-to-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiantao authored Jun 29, 2024
2 parents 31b190f + 1d5db16 commit db6bb02
Show file tree
Hide file tree
Showing 73 changed files with 1,932 additions and 724 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ jobs:
echo "Generated files are not up-to-date. Please run 'make gen-crd-code' and commit changes."
exit 1
fi
pushd controller/
make manifests generate
if ! git diff --exit-code; then
echo "Generated files are not up-to-date. Please run 'make manifests generate' under controller/ and commit changes."
exit 1
fi
popd
make gen-helm-docs
if ! git diff --exit-code; then
echo "Generated files are not up-to-date. Please run 'make gen-helm-docs' and commit changes."
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ controller/tests/testdata/crd/
go.work.sum
external/*
e2e/log
.idea
# helm charts
manifests/charts/**/*.tgz
9 changes: 9 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ linters:
- contextcheck
- errcheck
- exportloopref
- forcetypeassert
- gocheckcompilerdirectives
- gocritic
- gosec
Expand Down Expand Up @@ -38,14 +39,22 @@ issues:
- path: _test\.go # unit tests
linters:
- errcheck
- forcetypeassert
- gosec
- unparam
- path: tests/ # integration tests
linters:
- bodyclose
- errcheck
- forcetypeassert
- gosec
- unparam
- path: plugins/ # too much plugin config type assert in the plugins
linters:
- forcetypeassert
- path: registries/ # ditto
linters:
- forcetypeassert
# Show the complete output
max-issues-per-linter: 0
max-same-issues: 0
9 changes: 9 additions & 0 deletions api/internal/plugin_state/plugin_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package plugin_state

import (
"sync"

"mosn.io/htnn/api/pkg/filtermanager/api"
)

type pluginState struct {
store map[string]map[string]any
lock sync.Mutex
}

func NewPluginState() api.PluginState {
Expand All @@ -29,13 +32,19 @@ func NewPluginState() api.PluginState {
}

func (p *pluginState) Get(namespace string, key string) any {
p.lock.Lock()
defer p.lock.Unlock()

if pluginStore, ok := p.store[namespace]; ok {
return pluginStore[key]
}
return nil
}

func (p *pluginState) Set(namespace string, key string, value any) {
p.lock.Lock()
defer p.lock.Unlock()

pluginStore, ok := p.store[namespace]
if !ok {
pluginStore = make(map[string]any)
Expand Down
7 changes: 6 additions & 1 deletion api/pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package consumer

import (
"errors"
"fmt"
"reflect"

xds "github.com/cncf/xds/go/xds/type/v3"
capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
Expand All @@ -35,7 +37,10 @@ type consumerManager struct {
}

func ConsumerManagerFactory(c interface{}) capi.StreamFilterFactory {
conf := c.(*consumerManagerConfig)
conf, ok := c.(*consumerManagerConfig)
if !ok {
panic(fmt.Sprintf("wrong config type: %s", reflect.TypeOf(c)))
}
return func(callbacks capi.FilterCallbackHandler) capi.StreamFilter {
return &consumerManager{
callbacks: callbacks,
Expand Down
6 changes: 6 additions & 0 deletions api/pkg/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ func TestParse(t *testing.T) {
})
}
}

func TestConsumerManagerFactory(t *testing.T) {
assert.PanicsWithValuef(t, "wrong config type: *struct {}", func() {
ConsumerManagerFactory(&struct{}{})
}, "check if the panic message contains the wrong type")
}
202 changes: 202 additions & 0 deletions api/pkg/filtermanager/api_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filtermanager

import (
"fmt"
"net"
"net/http"
"net/url"
"runtime/debug"
"strconv"
"strings"
"sync"

capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"

"mosn.io/htnn/api/internal/consumer"
"mosn.io/htnn/api/internal/cookie"
"mosn.io/htnn/api/internal/plugin_state"
"mosn.io/htnn/api/pkg/filtermanager/api"
)

type filterManagerRequestHeaderMap struct {
capi.RequestHeaderMap

cacheLock sync.Mutex

u *url.URL
cookies map[string]*http.Cookie
}

func (headers *filterManagerRequestHeaderMap) expire(key string) {
switch key {
case ":path":
headers.u = nil
case "cookie":
headers.cookies = nil
}
}

func (headers *filterManagerRequestHeaderMap) Set(key, value string) {
headers.cacheLock.Lock()
key = strings.ToLower(key)
headers.expire(key)
headers.RequestHeaderMap.Set(key, value)
headers.cacheLock.Unlock()
}

func (headers *filterManagerRequestHeaderMap) Add(key, value string) {
headers.cacheLock.Lock()
key = strings.ToLower(key)
headers.expire(key)
headers.RequestHeaderMap.Add(key, value)
headers.cacheLock.Unlock()
}

func (headers *filterManagerRequestHeaderMap) Del(key string) {
headers.cacheLock.Lock()
key = strings.ToLower(key)
headers.expire(key)
headers.RequestHeaderMap.Del(key)
headers.cacheLock.Unlock()
}

func (headers *filterManagerRequestHeaderMap) Url() *url.URL {
headers.cacheLock.Lock()
defer headers.cacheLock.Unlock()

if headers.u == nil {
path := headers.Path()
u, err := url.ParseRequestURI(path)
if err != nil {
panic(fmt.Sprintf("unexpected bad request uri given by envoy: %v", err))
}
headers.u = u
}
return headers.u
}

// If multiple cookies match the given name, only one cookie will be returned.
func (headers *filterManagerRequestHeaderMap) Cookie(name string) *http.Cookie {
headers.cacheLock.Lock()
defer headers.cacheLock.Unlock()

if headers.cookies == nil {
cookieList := headers.Cookies()
headers.cookies = make(map[string]*http.Cookie, len(cookieList))
for _, c := range cookieList {
headers.cookies[c.Name] = c
}
}
return headers.cookies[name]
}

func (headers *filterManagerRequestHeaderMap) Cookies() []*http.Cookie {
// same-name cookies may be overridden in the headers.cookies
return cookie.ParseCookies(headers)
}

type filterManagerStreamInfo struct {
capi.StreamInfo

cacheLock sync.Mutex

ipAddress *api.IPAddress
}

func (s *filterManagerStreamInfo) DownstreamRemoteParsedAddress() *api.IPAddress {
s.cacheLock.Lock()
if s.ipAddress == nil {
ipport := s.StreamInfo.DownstreamRemoteAddress()
// the IPPort given by Envoy must be valid
ip, port, _ := net.SplitHostPort(ipport)
p, _ := strconv.Atoi(port)
s.ipAddress = &api.IPAddress{
Address: ipport,
IP: ip,
Port: p,
}
}
s.cacheLock.Unlock()
return s.ipAddress
}

func (s *filterManagerStreamInfo) DownstreamRemoteAddress() string {
return s.DownstreamRemoteParsedAddress().Address
}

type filterManagerCallbackHandler struct {
capi.FilterCallbackHandler

cacheLock sync.Mutex

namespace string
consumer api.Consumer
pluginState api.PluginState

streamInfo *filterManagerStreamInfo
}

func (cb *filterManagerCallbackHandler) Reset() {
cb.cacheLock.Lock()

cb.FilterCallbackHandler = nil
// We don't reset namespace, as filterManager will only be reused in the same route,
// which must have the same namespace.
cb.consumer = nil
cb.streamInfo = nil

cb.cacheLock.Unlock()
}

func (cb *filterManagerCallbackHandler) StreamInfo() api.StreamInfo {
cb.cacheLock.Lock()
if cb.streamInfo == nil {
cb.streamInfo = &filterManagerStreamInfo{
StreamInfo: cb.FilterCallbackHandler.StreamInfo(),
}
}
cb.cacheLock.Unlock()
return cb.streamInfo
}

// Consumer getter/setter should only be called in DecodeHeaders

func (cb *filterManagerCallbackHandler) LookupConsumer(pluginName, key string) (api.Consumer, bool) {
return consumer.LookupConsumer(cb.namespace, pluginName, key)
}

func (cb *filterManagerCallbackHandler) GetConsumer() api.Consumer {
return cb.consumer
}

func (cb *filterManagerCallbackHandler) SetConsumer(c api.Consumer) {
if c == nil {
api.LogErrorf("set consumer with nil consumer: %s", debug.Stack())
return
}
api.LogInfof("set consumer, namespace: %s, name: %s", cb.namespace, c.Name())
cb.consumer = c
}

func (cb *filterManagerCallbackHandler) PluginState() api.PluginState {
cb.cacheLock.Lock()
if cb.pluginState == nil {
cb.pluginState = plugin_state.NewPluginState()
}
cb.cacheLock.Unlock()
return cb.pluginState
}
Loading

0 comments on commit db6bb02

Please sign in to comment.