From 33eaba6008750e6aff5f845e2d1f5a21fdb539c8 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Mon, 29 Jul 2024 17:18:40 +0800 Subject: [PATCH] add service metrics Signed-off-by: LiZhenCheng9527 --- bpf/kmesh/probes/metrics.h | 29 +++- bpf/kmesh/workload/sockops.c | 24 ++- pkg/auth/rbac.go | 16 ++ pkg/controller/telemetry/metric.go | 212 ++++++++++++++++++----- pkg/controller/telemetry/metric_test.go | 213 +++++++++++++++++------- pkg/controller/telemetry/utils.go | 85 ++++++++-- pkg/controller/telemetry/utils_test.go | 57 +++++-- 7 files changed, 495 insertions(+), 141 deletions(-) diff --git a/bpf/kmesh/probes/metrics.h b/bpf/kmesh/probes/metrics.h index fd8a7114d..4387edb07 100644 --- a/bpf/kmesh/probes/metrics.h +++ b/bpf/kmesh/probes/metrics.h @@ -14,7 +14,6 @@ struct metric_key { }; struct metric_data { - __u32 direction; // update on connect __u32 conn_open; // update on connect __u32 conn_close; // update on close __u32 conn_failed; // update on close @@ -45,21 +44,38 @@ static inline void construct_metric_key(struct bpf_sock *sk, __u8 direction, str if (sk->family == AF_INET) { key->src_ip.ip4 = sk->src_ip4; key->dst_ip.ip4 = sk->dst_ip4; - } else { + } + if (sk->family == AF_INET6) { bpf_memcpy(key->src_ip.ip6, sk->src_ip6, IPV6_ADDR_LEN); bpf_memcpy(key->dst_ip.ip6, sk->dst_ip6, IPV6_ADDR_LEN); } - key->dst_port = bpf_ntohl(sk->dst_port); - } else { + key->dst_port = bpf_ntohs(sk->dst_port); + } + if (direction == INBOUND) { if (sk->family == AF_INET) { key->src_ip.ip4 = sk->dst_ip4; key->dst_ip.ip4 = sk->src_ip4; - } else { + } + if (sk->family == AF_INET6) { bpf_memcpy(key->src_ip.ip6, sk->dst_ip6, IPV6_ADDR_LEN); bpf_memcpy(key->dst_ip.ip6, sk->src_ip6, IPV6_ADDR_LEN); } key->dst_port = sk->src_port; } + + if (is_ipv4_mapped_addr(key->src_ip.ip6)) { + key->src_ip.ip4 = key->src_ip.ip6[3]; + key->dst_ip.ip4 = key->dst_ip.ip6[3]; + key->src_ip.ip6[0] = key->src_ip.ip4; + key->dst_ip.ip6[0] = key->dst_ip.ip4; + key->src_ip.ip6[1] = 0; + key->src_ip.ip6[2] = 0; + key->src_ip.ip6[3] = 0; + key->dst_ip.ip6[1] = 0; + key->dst_ip.ip6[2] = 0; + key->dst_ip.ip6[3] = 0; + } + return; } @@ -87,7 +103,6 @@ metric_on_connect(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct soc metric = (struct metric_data *)bpf_map_lookup_elem(&map_of_metrics, &key); if (!metric) { data.conn_open++; - data.direction = storage->direction; int err = bpf_map_update_elem(&map_of_metrics, &key, &data, BPF_NOEXIST); if (err) { BPF_LOG(ERR, PROBE, "metric_on_connect update failed, err is %d\n", err); @@ -97,7 +112,6 @@ metric_on_connect(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct soc } metric->conn_open++; - metric->direction = storage->direction; notify: report_metrics(&key); return; @@ -114,7 +128,6 @@ metric_on_close(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct sock_ metric = (struct metric_data *)bpf_map_lookup_elem(&map_of_metrics, &key); if (!metric) { // connect failed - data.direction = storage->direction; data.conn_failed++; int err = bpf_map_update_elem(&map_of_metrics, &key, &data, BPF_NOEXIST); if (err) { diff --git a/bpf/kmesh/workload/sockops.c b/bpf/kmesh/workload/sockops.c index e540e5097..bc628f881 100644 --- a/bpf/kmesh/workload/sockops.c +++ b/bpf/kmesh/workload/sockops.c @@ -38,8 +38,12 @@ static inline bool is_managed_by_kmesh(struct bpf_sock_ops *skops) struct manager_key key = {0}; if (skops->family == AF_INET) key.addr.ip4 = skops->local_ip4; - if (skops->family == AF_INET6) - IP6_COPY(key.addr.ip6, skops->local_ip6); + if (skops->family == AF_INET6) { + if (is_ipv4_mapped_addr(skops->local_ip6)) + key.addr.ip4 = skops->local_ip4; + else + IP6_COPY(key.addr.ip6, skops->local_ip6); + } int *value = bpf_map_lookup_elem(&map_of_manager, &key); if (!value) @@ -75,7 +79,8 @@ static inline void extract_skops_to_tuple_reverse(struct bpf_sock_ops *skops, st tuple_key->ipv4.sport = GET_SKOPS_REMOTE_PORT(skops); // local_port is host byteorder tuple_key->ipv4.dport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); - } else { + } + if (skops->family == AF_INET6) { bpf_memcpy(tuple_key->ipv6.saddr, skops->remote_ip6, IPV6_ADDR_LEN); bpf_memcpy(tuple_key->ipv6.daddr, skops->local_ip6, IPV6_ADDR_LEN); // remote_port is network byteorder @@ -83,6 +88,19 @@ static inline void extract_skops_to_tuple_reverse(struct bpf_sock_ops *skops, st // local_port is host byteorder tuple_key->ipv6.dport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); } + + if (is_ipv4_mapped_addr(tuple_key->ipv6.saddr)) { + tuple_key->ipv4.saddr = tuple_key->ipv6.saddr[3]; + tuple_key->ipv4.daddr = tuple_key->ipv6.daddr[3]; + tuple_key->ipv6.saddr[0] = tuple_key->ipv4.saddr; + tuple_key->ipv6.daddr[0] = tuple_key->ipv4.daddr; + tuple_key->ipv6.saddr[1] = 0; + tuple_key->ipv6.saddr[2] = 0; + tuple_key->ipv6.saddr[3] = 0; + tuple_key->ipv6.daddr[1] = 0; + tuple_key->ipv6.daddr[2] = 0; + tuple_key->ipv6.daddr[3] = 0; + } } // clean map_of_auth diff --git a/pkg/auth/rbac.go b/pkg/auth/rbac.go index 40df718ed..f0deef71a 100644 --- a/pkg/auth/rbac.go +++ b/pkg/auth/rbac.go @@ -146,6 +146,11 @@ func (r *Rbac) Run(ctx context.Context, mapOfTuple, mapOfAuth *ebpf.Map) { continue } + if msgType == MSG_TYPE_IPV6 { + conn.dstIp = restoreIPv4(conn.dstIp) + conn.srcIp = restoreIPv4(conn.srcIp) + } + if !r.doRbac(&conn) { log.Infof("Auth denied for connection: %+v", conn) // If conn is denied, write tuples into XDP map, which includes source/destination IP/Port @@ -511,3 +516,14 @@ func (r *Rbac) getIdentityByIp(ip []byte) Identity { serviceAccount: workload.GetServiceAccount(), } } + +// Converting IPv4 data reported in IPv6 form to IPv4 +func restoreIPv4(bytes []byte) []byte { + for i := 4; i < 16; i++ { + if bytes[i] != 0 { + return bytes + } + } + + return bytes[:4] +} diff --git a/pkg/controller/telemetry/metric.go b/pkg/controller/telemetry/metric.go index 4fae02bc1..aef7b53d7 100644 --- a/pkg/controller/telemetry/metric.go +++ b/pkg/controller/telemetry/metric.go @@ -23,6 +23,7 @@ import ( "fmt" "net/netip" "reflect" + "strings" "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" @@ -37,12 +38,13 @@ type MetricController struct { } type metricKey struct { - SrcIp [4]uint32 - DstIp [4]uint32 + SrcIp [4]uint32 + DstIp [4]uint32 + Direction uint32 + DstPort uint32 } type metricValue struct { - Direction uint32 ConnectionOpen uint32 ConnectionClose uint32 ConnectionFailed uint32 @@ -53,15 +55,45 @@ type metricValue struct { type requestMetric struct { src [4]uint32 dst [4]uint32 + dstPort uint32 + direction uint32 connectionOpened uint32 connectionClosed uint32 receivedBytes uint32 sentBytes uint32 - success bool } -type commonTrafficLabels struct { - direction string +type workloadMetricLabels struct { + reporter string + + sourceWorkload string + sourceCanonicalService string + sourceCanonicalRevision string + sourceWorkloadNamespace string + sourcePrincipal string + sourceApp string + sourceVersion string + sourceCluster string + + destinationPodAddress string + destinationPodNamespace string + destinationPodName string + destinationWorkload string + destinationCanonicalService string + destinationCanonicalRevision string + destinationWorkloadNamespace string + destinationPrincipal string + destinationApp string + destinationVersion string + destinationCluster string + + requestProtocol string + responseFlags string + connectionSecurityPolicy string +} + +type serviceMetricLabels struct { + reporter string sourceWorkload string sourceCanonicalService string @@ -142,31 +174,34 @@ func (m *MetricController) Run(ctx context.Context, mapOfMetricNotify, mapOfMetr data.src = key.SrcIp data.dst = key.DstIp + data.direction = key.Direction + data.dstPort = key.DstPort data.connectionClosed = value.ConnectionClose data.connectionOpened = value.ConnectionOpen data.sentBytes = value.SentBytes data.receivedBytes = value.ReceivedBytes - data.success = true - commonTrafficLabels, err := m.buildMetric(&data) - if err != nil { - log.Warnf("reporter records error") - } + workloadLabels := m.buildWorkloadMetric(&data) + serviceLabels := m.buildServiceMetric(&data) - commonTrafficLabels.direction = "-" - if value.Direction == constants.INBOUND { - commonTrafficLabels.direction = "INBOUND" + workloadLabels.reporter = "-" + serviceLabels.reporter = "-" + if key.Direction == constants.INBOUND { + workloadLabels.reporter = "destination" + serviceLabels.reporter = "destination" } - if value.Direction == constants.OUTBOUND { - commonTrafficLabels.direction = "OUTBOUND" + if key.Direction == constants.OUTBOUND { + workloadLabels.reporter = "source" + serviceLabels.reporter = "source" } - buildMetricsToPrometheus(data, commonTrafficLabels) + buildWorkloadMetricsToPrometheus(data, workloadLabels) + buildServiceMetricsToPrometheus(data, serviceLabels) } } } -func (m *MetricController) buildMetric(data *requestMetric) (commonTrafficLabels, error) { +func (m *MetricController) buildWorkloadMetric(data *requestMetric) workloadMetricLabels { var dstAddr, srcAddr []byte for i := range data.dst { dstAddr = binary.LittleEndian.AppendUint32(dstAddr, data.dst[i]) @@ -176,14 +211,31 @@ func (m *MetricController) buildMetric(data *requestMetric) (commonTrafficLabels dstWorkload, dstIP := m.getWorkloadByAddress(restoreIPv4(dstAddr)) srcWorkload, _ := m.getWorkloadByAddress(restoreIPv4(srcAddr)) - trafficLabels := buildMetricFromWorkload(dstWorkload, srcWorkload) - trafficLabels.destinationService = dstIP + trafficLabels := buildWorkloadMetric(dstWorkload, srcWorkload) + trafficLabels.destinationPodAddress = dstIP + trafficLabels.requestProtocol = "tcp" + trafficLabels.responseFlags = "-" + trafficLabels.connectionSecurityPolicy = "mutual_tls" + + return trafficLabels +} + +func (m *MetricController) buildServiceMetric(data *requestMetric) serviceMetricLabels { + var dstAddr, srcAddr []byte + for i := range data.dst { + dstAddr = binary.LittleEndian.AppendUint32(dstAddr, data.dst[i]) + srcAddr = binary.LittleEndian.AppendUint32(srcAddr, data.src[i]) + } + + dstWorkload, _ := m.getWorkloadByAddress(restoreIPv4(dstAddr)) + srcWorkload, _ := m.getWorkloadByAddress(restoreIPv4(srcAddr)) + trafficLabels := buildServiceMetric(dstWorkload, srcWorkload, data.dstPort) trafficLabels.requestProtocol = "tcp" trafficLabels.responseFlags = "-" trafficLabels.connectionSecurityPolicy = "mutual_tls" - return trafficLabels, nil + return trafficLabels } func (m *MetricController) getWorkloadByAddress(address []byte) (*workloadapi.Workload, string) { @@ -197,15 +249,73 @@ func (m *MetricController) getWorkloadByAddress(address []byte) (*workloadapi.Wo return workload, networkAddr.Address.String() } -func buildMetricFromWorkload(dstWorkload, srcWorkload *workloadapi.Workload) commonTrafficLabels { +func buildWorkloadMetric(dstWorkload, srcWorkload *workloadapi.Workload) workloadMetricLabels { if dstWorkload == nil || srcWorkload == nil { - return commonTrafficLabels{} + return workloadMetricLabels{} } - trafficLabels := commonTrafficLabels{} + trafficLabels := workloadMetricLabels{} + + trafficLabels.destinationPodNamespace = dstWorkload.Namespace + trafficLabels.destinationPodName = dstWorkload.Name + trafficLabels.destinationWorkload = dstWorkload.WorkloadName + trafficLabels.destinationCanonicalService = dstWorkload.CanonicalName + trafficLabels.destinationCanonicalRevision = dstWorkload.CanonicalRevision + trafficLabels.destinationWorkloadNamespace = dstWorkload.Namespace + trafficLabels.destinationApp = dstWorkload.CanonicalName + trafficLabels.destinationVersion = dstWorkload.CanonicalRevision + trafficLabels.destinationCluster = dstWorkload.ClusterId + + trafficLabels.sourceWorkload = srcWorkload.WorkloadName + trafficLabels.sourceCanonicalService = srcWorkload.CanonicalName + trafficLabels.sourceCanonicalRevision = srcWorkload.CanonicalRevision + trafficLabels.sourceWorkloadNamespace = srcWorkload.Namespace + trafficLabels.sourceApp = srcWorkload.CanonicalName + trafficLabels.sourceVersion = srcWorkload.CanonicalRevision + trafficLabels.sourceCluster = srcWorkload.ClusterId + + trafficLabels.destinationPrincipal = buildPrincipal(dstWorkload) + trafficLabels.sourcePrincipal = buildPrincipal(srcWorkload) + + return trafficLabels +} + +func buildServiceMetric(dstWorkload, srcWorkload *workloadapi.Workload, dstPort uint32) serviceMetricLabels { + if dstWorkload == nil || srcWorkload == nil { + return serviceMetricLabels{} + } + + trafficLabels := serviceMetricLabels{} + + namespacedhost := "" + for k, portList := range dstWorkload.Services { + for _, port := range portList.Ports { + if port.TargetPort == dstPort { + namespacedhost = k + break + } + } + if namespacedhost != "" { + break + } + } + if namespacedhost == "" { + log.Infof("can't find service correspond workload: %s", dstWorkload.Name) + } + + svcHost := "" + svcNamespace := "" + if len(strings.Split(namespacedhost, "/")) != 2 { + log.Info("get destination service host failed") + } else { + svcNamespace = strings.Split(namespacedhost, "/")[0] + svcHost = strings.Split(namespacedhost, "/")[1] + } + + trafficLabels.destinationService = svcHost + trafficLabels.destinationServiceNamespace = svcNamespace + trafficLabels.destinationServiceName = svcHost - trafficLabels.destinationServiceNamespace = dstWorkload.Namespace - trafficLabels.destinationServiceName = dstWorkload.Name trafficLabels.destinationWorkload = dstWorkload.WorkloadName trafficLabels.destinationCanonicalService = dstWorkload.CanonicalName trafficLabels.destinationCanonicalRevision = dstWorkload.CanonicalRevision @@ -235,29 +345,41 @@ func buildPrincipal(workload *workloadapi.Workload) string { return "-" } -func buildMetricsToPrometheus(data requestMetric, labels commonTrafficLabels) { - commonLabels := commonTrafficLabels2map(&labels) - tcpConnectionOpened.With(commonLabels).Set(float64(data.connectionOpened)) - tcpConnectionClosed.With(commonLabels).Set(float64(data.connectionClosed)) - tcpReceivedBytes.With(commonLabels).Set(float64(data.receivedBytes)) - tcpSentBytes.With(commonLabels).Set(float64(data.sentBytes)) +func buildWorkloadMetricsToPrometheus(data requestMetric, labels workloadMetricLabels) { + commonLabels := struct2map(labels) + tcpConnectionOpenedInWorkload.With(commonLabels).Set(float64(data.connectionOpened)) + tcpConnectionClosedInWorkload.With(commonLabels).Set(float64(data.connectionClosed)) + tcpReceivedBytesInWorkload.With(commonLabels).Set(float64(data.receivedBytes)) + tcpSentBytesInWorkload.With(commonLabels).Set(float64(data.sentBytes)) } -func commonTrafficLabels2map(labels *commonTrafficLabels) map[string]string { - trafficLabelsMap := make(map[string]string) - - val := reflect.ValueOf(labels).Elem() - num := val.NumField() - for i := 0; i < num; i++ { - fieldInfo := val.Type().Field(i) - if val.Field(i).String() == "" { - trafficLabelsMap[labelsMap[fieldInfo.Name]] = "-" - } else { - trafficLabelsMap[labelsMap[fieldInfo.Name]] = val.Field(i).String() +func buildServiceMetricsToPrometheus(data requestMetric, labels serviceMetricLabels) { + commonLabels := struct2map(labels) + tcpConnectionOpenedInService.With(commonLabels).Add(float64(data.connectionOpened)) + tcpConnectionClosedInService.With(commonLabels).Add(float64(data.connectionClosed)) + tcpReceivedBytesInService.With(commonLabels).Add(float64(data.receivedBytes)) + tcpSentBytesInService.With(commonLabels).Add(float64(data.sentBytes)) +} + +func struct2map(labels interface{}) map[string]string { + if reflect.TypeOf(labels).Kind() == reflect.Struct { + trafficLabelsMap := make(map[string]string) + val := reflect.ValueOf(labels) + num := val.NumField() + for i := 0; i < num; i++ { + fieldInfo := val.Type().Field(i) + if val.Field(i).String() == "" { + trafficLabelsMap[labelsMap[fieldInfo.Name]] = "-" + } else { + trafficLabelsMap[labelsMap[fieldInfo.Name]] = val.Field(i).String() + } } - } - return trafficLabelsMap + return trafficLabelsMap + } else { + log.Error("failed to convert struct to map") + } + return nil } // Converting IPv4 data reported in IPv6 form to IPv4 diff --git a/pkg/controller/telemetry/metric_test.go b/pkg/controller/telemetry/metric_test.go index d909eb649..010550633 100644 --- a/pkg/controller/telemetry/metric_test.go +++ b/pkg/controller/telemetry/metric_test.go @@ -31,7 +31,7 @@ import ( func TestCommonTrafficLabels2map(t *testing.T) { type args struct { - labels *commonTrafficLabels + labels interface{} } tests := []struct { name string @@ -41,8 +41,8 @@ func TestCommonTrafficLabels2map(t *testing.T) { { name: "normal commonTrafficLabels to map test", args: args{ - labels: &commonTrafficLabels{ - direction: "INBOUND", + labels: workloadMetricLabels{ + reporter: "destination", sourceWorkload: "sleep", sourceCanonicalService: "sleep", @@ -52,9 +52,9 @@ func TestCommonTrafficLabels2map(t *testing.T) { sourceApp: "sleep", sourceVersion: "latest", sourceCluster: "Kubernetes", - destinationService: "tcp-echo.ambient-demo.svc.cluster.local", - destinationServiceNamespace: "ambient-demo", - destinationServiceName: "tcp-echo", + destinationPodAddress: "192.168.10.24", + destinationPodNamespace: "ambient-demo", + destinationPodName: "tcp-echo", destinationWorkload: "tcp-echo", destinationCanonicalService: "tcp-echo", destinationCanonicalRevision: "v1", @@ -69,7 +69,7 @@ func TestCommonTrafficLabels2map(t *testing.T) { }, }, want: map[string]string{ - "direction": "INBOUND", + "reporter": "destination", "source_workload": "sleep", "source_canonical_service": "sleep", "source_canonical_revision": "latest", @@ -78,9 +78,9 @@ func TestCommonTrafficLabels2map(t *testing.T) { "source_app": "sleep", "source_version": "latest", "source_cluster": "Kubernetes", - "destination_service": "tcp-echo.ambient-demo.svc.cluster.local", - "destination_service_namespace": "ambient-demo", - "destination_service_name": "tcp-echo", + "destination_pod_address": "192.168.10.24", + "destination_pod_namespace": "ambient-demo", + "destination_pod_name": "tcp-echo", "destination_workload": "tcp-echo", "destination_canonical_service": "tcp-echo", "destination_canonical_revision": "v1", @@ -97,10 +97,10 @@ func TestCommonTrafficLabels2map(t *testing.T) { { name: "empty commonTrafficLabels to map test", args: args{ - labels: &commonTrafficLabels{}, + labels: workloadMetricLabels{}, }, want: map[string]string{ - "direction": "-", + "reporter": "-", "source_workload": "-", "source_canonical_service": "-", "source_canonical_revision": "-", @@ -109,9 +109,9 @@ func TestCommonTrafficLabels2map(t *testing.T) { "source_app": "-", "source_version": "-", "source_cluster": "-", - "destination_service": "-", - "destination_service_namespace": "-", - "destination_service_name": "-", + "destination_pod_address": "-", + "destination_pod_namespace": "-", + "destination_pod_name": "-", "destination_workload": "-", "destination_canonical_service": "-", "destination_canonical_revision": "-", @@ -128,14 +128,14 @@ func TestCommonTrafficLabels2map(t *testing.T) { { name: "Only some fields in the commonTrafficLabels have values", args: args{ - labels: &commonTrafficLabels{ - direction: "OUTBOUND", + labels: workloadMetricLabels{ + reporter: "source", sourceWorkload: "sleep", destinationWorkload: "tcp-echo", }, }, want: map[string]string{ - "direction": "OUTBOUND", + "reporter": "source", "source_workload": "sleep", "source_canonical_service": "-", "source_canonical_revision": "-", @@ -144,9 +144,9 @@ func TestCommonTrafficLabels2map(t *testing.T) { "source_app": "-", "source_version": "-", "source_cluster": "-", - "destination_service": "-", - "destination_service_namespace": "-", - "destination_service_name": "-", + "destination_pod_address": "-", + "destination_pod_namespace": "-", + "destination_pod_name": "-", "destination_workload": "tcp-echo", "destination_canonical_service": "-", "destination_canonical_revision": "-", @@ -163,8 +163,9 @@ func TestCommonTrafficLabels2map(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := commonTrafficLabels2map(tt.args.labels); !reflect.DeepEqual(got, tt.want) { - t.Errorf("commonTrafficLabels2map() = %v, want %v", got, tt.want) + if got := struct2map(tt.args.labels); !reflect.DeepEqual(got, tt.want) { + assert.Equal(t, tt.want, got) + t.Errorf("struct2map() = %v, want %v", got, tt.want) } }) } @@ -172,15 +173,15 @@ func TestCommonTrafficLabels2map(t *testing.T) { func TestBuildMetricsToPrometheus(t *testing.T) { metrics := []*prometheus.GaugeVec{ - tcpConnectionClosed, - tcpConnectionOpened, - tcpReceivedBytes, - tcpSentBytes, + tcpConnectionClosedInWorkload, + tcpConnectionOpenedInWorkload, + tcpReceivedBytesInWorkload, + tcpSentBytesInWorkload, } type args struct { data requestMetric - labels commonTrafficLabels + labels workloadMetricLabels } tests := []struct { name string @@ -197,10 +198,9 @@ func TestBuildMetricsToPrometheus(t *testing.T) { connectionClosed: 0x0000002, sentBytes: 0x0000003, receivedBytes: 0x0000004, - success: true, }, - labels: commonTrafficLabels{ - direction: "INBOUND", + labels: workloadMetricLabels{ + reporter: "destination", sourceWorkload: "sleep", sourceCanonicalService: "sleep", sourceCanonicalRevision: "latest", @@ -209,9 +209,9 @@ func TestBuildMetricsToPrometheus(t *testing.T) { sourceApp: "sleep", sourceVersion: "latest", sourceCluster: "Kubernetes", - destinationService: "tcp-echo.ambient-demo.svc.cluster.local", - destinationServiceNamespace: "ambient-demo", - destinationServiceName: "tcp-echo", + destinationPodAddress: "192.168.20.25", + destinationPodNamespace: "ambient-demo", + destinationPodName: "tcp-echo", destinationWorkload: "tcp-echo", destinationCanonicalService: "tcp-echo", destinationCanonicalRevision: "v1", @@ -237,8 +237,8 @@ func TestBuildMetricsToPrometheus(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go RunPrometheusClient(ctx) - buildMetricsToPrometheus(tt.args.data, tt.args.labels) - commonLabels := commonTrafficLabels2map(&tt.args.labels) + buildWorkloadMetricsToPrometheus(tt.args.data, tt.args.labels) + commonLabels := struct2map(tt.args.labels) for index, metric := range metrics { if gauge, err := metric.GetMetricWith(commonLabels); err != nil { t.Errorf("use labels to get %v failed", metric) @@ -254,7 +254,7 @@ func TestBuildMetricsToPrometheus(t *testing.T) { } } -func TestBuildMetricFromWorkload(t *testing.T) { +func TestBuildWorkloadMetric(t *testing.T) { type args struct { dstWorkload *workloadapi.Workload srcWorkload *workloadapi.Workload @@ -262,7 +262,7 @@ func TestBuildMetricFromWorkload(t *testing.T) { tests := []struct { name string args args - want commonTrafficLabels + want workloadMetricLabels }{ { name: "normal capability test", @@ -288,8 +288,8 @@ func TestBuildMetricFromWorkload(t *testing.T) { ServiceAccount: "default", }, }, - want: commonTrafficLabels{ - direction: "-", + want: workloadMetricLabels{ + reporter: "-", sourceWorkload: "kmesh-daemon", sourceCanonicalService: "srcCanonical", sourceCanonicalRevision: "srcVersion", @@ -298,9 +298,9 @@ func TestBuildMetricFromWorkload(t *testing.T) { sourceApp: "srcCanonical", sourceVersion: "srcVersion", sourceCluster: "Kubernetes", - destinationService: "-", - destinationServiceNamespace: "kmesh-system", - destinationServiceName: "kmesh", + destinationPodAddress: "-", + destinationPodNamespace: "kmesh-system", + destinationPodName: "kmesh", destinationWorkload: "kmesh-daemon", destinationCanonicalService: "dstCanonical", destinationCanonicalRevision: "dstVersion", @@ -317,9 +317,9 @@ func TestBuildMetricFromWorkload(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualLabels := buildMetricFromWorkload(tt.args.dstWorkload, tt.args.srcWorkload) - expectMap := commonTrafficLabels2map(&tt.want) - actualMap := commonTrafficLabels2map(&actualLabels) + actualLabels := buildWorkloadMetric(tt.args.dstWorkload, tt.args.srcWorkload) + expectMap := struct2map(tt.want) + actualMap := struct2map(actualLabels) assert.Equal(t, expectMap, actualMap) }) } @@ -362,7 +362,7 @@ func TestMetricGetWorkloadByAddress(t *testing.T) { } } -func TestMetricBuildMetric(t *testing.T) { +func TestBuildworkloadMetric(t *testing.T) { dstWorkload := &workloadapi.Workload{ Namespace: "kmesh-system", Name: "kmesh", @@ -397,7 +397,7 @@ func TestMetricBuildMetric(t *testing.T) { tests := []struct { name string args args - want commonTrafficLabels + want workloadMetricLabels wantErr bool }{ { @@ -410,10 +410,9 @@ func TestMetricBuildMetric(t *testing.T) { connectionClosed: uint32(8), sentBytes: uint32(156), receivedBytes: uint32(1024), - success: true, }, }, - want: commonTrafficLabels{ + want: workloadMetricLabels{ sourceWorkload: "kmesh-daemon", sourceCanonicalService: "srcCanonical", sourceCanonicalRevision: "srcVersion", @@ -422,9 +421,9 @@ func TestMetricBuildMetric(t *testing.T) { sourceApp: "srcCanonical", sourceVersion: "srcVersion", sourceCluster: "Kubernetes", - destinationService: "192.168.224.22", - destinationServiceNamespace: "kmesh-system", - destinationServiceName: "kmesh", + destinationPodAddress: "192.168.224.22", + destinationPodNamespace: "kmesh-system", + destinationPodName: "kmesh", destinationWorkload: "kmesh-daemon", destinationCanonicalService: "dstCanonical", destinationCanonicalRevision: "dstVersion", @@ -447,11 +446,7 @@ func TestMetricBuildMetric(t *testing.T) { } m.workloadCache.AddWorkload(dstWorkload) m.workloadCache.AddWorkload(srcWorkload) - got, err := m.buildMetric(tt.args.data) - if (err != nil) != tt.wantErr { - t.Errorf("Metric.buildMetric() error = %v, wantErr %v", err, tt.wantErr) - return - } + got := m.buildWorkloadMetric(tt.args.data) if !reflect.DeepEqual(got, tt.want) { t.Errorf("Metric.buildMetric() = %v, want %v", got, tt.want) } @@ -491,3 +486,105 @@ func TestByteToIpByte(t *testing.T) { }) } } + +func TestBuildServiceMetric(t *testing.T) { + dstWorkload := &workloadapi.Workload{ + Namespace: "kmesh-system", + Name: "kmesh", + WorkloadName: "kmesh-daemon", + CanonicalName: "dstCanonical", + CanonicalRevision: "dstVersion", + ClusterId: "Kubernetes", + TrustDomain: "cluster.local", + ServiceAccount: "default", + Uid: "123456", + Addresses: [][]byte{ + {192, 168, 224, 22}, + }, + Services: map[string]*workloadapi.PortList{ + "kmesh-system/kmesh.kmesh-system.svc.cluster.local": { + Ports: []*workloadapi.Port{ + { + TargetPort: 80, + ServicePort: 8000, + }, + }, + }, + }, + } + srcWorkload := &workloadapi.Workload{ + Namespace: "kmesh-system", + Name: "kmesh", + WorkloadName: "kmesh-daemon", + CanonicalName: "srcCanonical", + CanonicalRevision: "srcVersion", + ClusterId: "Kubernetes", + TrustDomain: "cluster.local", + ServiceAccount: "default", + Uid: "654321", + Addresses: [][]byte{ + {10, 19, 25, 31}, + }, + } + type args struct { + data *requestMetric + } + tests := []struct { + name string + args args + want serviceMetricLabels + wantErr bool + }{ + { + name: "normal capability test", + args: args{ + data: &requestMetric{ + src: [4]uint32{521736970, 0, 0, 0}, + dst: [4]uint32{383822016, 0, 0, 0}, + dstPort: uint32(80), + direction: uint32(2), + connectionOpened: uint32(16), + connectionClosed: uint32(8), + sentBytes: uint32(156), + receivedBytes: uint32(1024), + }, + }, + want: serviceMetricLabels{ + sourceWorkload: "kmesh-daemon", + sourceCanonicalService: "srcCanonical", + sourceCanonicalRevision: "srcVersion", + sourceWorkloadNamespace: "kmesh-system", + sourcePrincipal: "spiffe://cluster.local/ns/kmesh-system/sa/default", + sourceApp: "srcCanonical", + sourceVersion: "srcVersion", + sourceCluster: "Kubernetes", + destinationService: "kmesh.kmesh-system.svc.cluster.local", + destinationServiceNamespace: "kmesh-system", + destinationServiceName: "kmesh.kmesh-system.svc.cluster.local", + destinationWorkload: "kmesh-daemon", + destinationCanonicalService: "dstCanonical", + destinationCanonicalRevision: "dstVersion", + destinationWorkloadNamespace: "kmesh-system", + destinationPrincipal: "spiffe://cluster.local/ns/kmesh-system/sa/default", + destinationApp: "dstCanonical", + destinationVersion: "dstVersion", + destinationCluster: "Kubernetes", + requestProtocol: "tcp", + responseFlags: "-", + connectionSecurityPolicy: "mutual_tls", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := MetricController{ + workloadCache: cache.NewWorkloadCache(), + } + m.workloadCache.AddWorkload(dstWorkload) + m.workloadCache.AddWorkload(srcWorkload) + got := m.buildServiceMetric(tt.args.data) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/controller/telemetry/utils.go b/pkg/controller/telemetry/utils.go index f50fa83bc..b66a1a039 100644 --- a/pkg/controller/telemetry/utils.go +++ b/pkg/controller/telemetry/utils.go @@ -31,8 +31,34 @@ var ( log = logger.NewLoggerField("pkg/telemetry") mu sync.Mutex - trafficLabels = []string{ - "direction", + workloadLabels = []string{ + "reporter", + "source_workload", + "source_canonical_service", + "source_canonical_revision", + "source_workload_namespace", + "source_principal", + "source_app", + "source_version", + "source_cluster", + "destination_pod_address", + "destination_pod_namespace", + "destination_pod_name", + "destination_workload", + "destination_canonical_service", + "destination_canonical_revision", + "destination_workload_namespace", + "destination_principal", + "destination_app", + "destination_version", + "destination_cluster", + "request_protocol", + "response_flags", + "connection_security_policy", + } + + serviceLabels = []string{ + "reporter", "source_workload", "source_canonical_service", "source_canonical_revision", @@ -58,7 +84,7 @@ var ( } labelsMap = map[string]string{ - "direction": "direction", + "reporter": "reporter", "sourceWorkload": "source_workload", "sourceCanonicalService": "source_canonical_service", "sourceCanonicalRevision": "source_canonical_revision", @@ -70,6 +96,9 @@ var ( "destinationService": "destination_service", "destinationServiceNamespace": "destination_service_namespace", "destinationServiceName": "destination_service_name", + "destinationPodAddress": "destination_pod_address", + "destinationPodNamespace": "destination_pod_namespace", + "destinationPodName": "destination_pod_name", "destinationWorkload": "destination_workload", "destinationCanonicalService": "destination_canonical_service", "destinationCanonicalRevision": "destination_canonical_revision", @@ -85,28 +114,51 @@ var ( ) var ( - tcpConnectionOpened = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + tcpConnectionOpenedInWorkload = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "kmesh_tcp_connections_opened_total", - Help: "The total number of TCP connections opened", - }, trafficLabels) + Help: "The total number of TCP connections opened to a workload", + }, workloadLabels) - tcpConnectionClosed = prometheus.NewGaugeVec( + tcpConnectionClosedInWorkload = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "kmesh_tcp_connections_closed_total", - Help: "The total number of TCP connections closed", - }, trafficLabels) + Help: "The total number of TCP connections closed to a workload", + }, workloadLabels) - tcpReceivedBytes = prometheus.NewGaugeVec( + tcpReceivedBytesInWorkload = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "kmesh_tcp_received_bytes_total", - Help: "The size of total bytes received during request in case of a TCP connection", - }, trafficLabels) + Help: "The size of the total number of bytes reveiced in response to a workload over a TCP connection.", + }, workloadLabels) - tcpSentBytes = prometheus.NewGaugeVec( + tcpSentBytesInWorkload = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "kmesh_tcp_sent_bytes_total", - Help: "The size of total bytes sent during response in case of a TCP connection", - }, trafficLabels) + Help: "The size of the total number of bytes sent in response to a workload over a TCP connection.", + }, workloadLabels) + + tcpConnectionOpenedInService = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kmesh_tcp_service_connections_opened_total", + Help: "The total number of TCP connections opened to a service", + }, serviceLabels) + + tcpConnectionClosedInService = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "kmesh_tcp_service_connections_closed_total", + Help: "The total number of TCP connections closed to a service", + }, serviceLabels) + + tcpReceivedBytesInService = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "kmesh_tcp_service_received_bytes_total", + Help: "The size of the total number of bytes reveiced in response to a service over a TCP connection.", + }, serviceLabels) + + tcpSentBytesInService = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "kmesh_tcp_service_sent_bytes_total", + Help: "The size of the total number of bytes sent in response to a service over a TCP connection.", + }, serviceLabels) ) func RunPrometheusClient(ctx context.Context) { @@ -125,7 +177,8 @@ func runPrometheusClient(registry *prometheus.Registry) { // ensure not occur matche the same requests as /status/metric panic mu.Lock() defer mu.Unlock() - registry.MustRegister(tcpConnectionOpened, tcpConnectionClosed, tcpReceivedBytes, tcpSentBytes) + registry.MustRegister(tcpConnectionOpenedInWorkload, tcpConnectionClosedInWorkload, tcpReceivedBytesInWorkload, tcpSentBytesInWorkload) + registry.MustRegister(tcpConnectionOpenedInService, tcpConnectionClosedInService, tcpReceivedBytesInService, tcpSentBytesInService) http.Handle("/status/metric", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ Registry: registry, diff --git a/pkg/controller/telemetry/utils_test.go b/pkg/controller/telemetry/utils_test.go index 7b3989a72..4a4bcd4eb 100644 --- a/pkg/controller/telemetry/utils_test.go +++ b/pkg/controller/telemetry/utils_test.go @@ -38,14 +38,18 @@ func TestRegisterMetrics(t *testing.T) { }() exportMetrics := []*prometheus.GaugeVec{ - tcpConnectionClosed, - tcpConnectionOpened, - tcpReceivedBytes, - tcpSentBytes, + tcpConnectionClosedInWorkload, + tcpConnectionOpenedInWorkload, + tcpReceivedBytesInWorkload, + tcpSentBytesInWorkload, + tcpConnectionClosedInService, + tcpConnectionOpenedInService, + tcpReceivedBytesInService, + tcpSentBytesInService, } - testlabels := map[string]string{ - "direction": "INBOUND", + workloadLabels := map[string]string{ + "reporter": "destination", "source_workload": "sleep", "source_canonical_service": "sleep", "source_canonical_revision": "latest", @@ -54,7 +58,33 @@ func TestRegisterMetrics(t *testing.T) { "source_app": "sleep", "source_version": "latest", "source_cluster": "Kubernetes", - "destination_service": "tcp-echo.ambient-demo.svc.cluster.local", + "destination_pod_address": "192.068.10.25", + "destination_pod_namespace": "ambient-demo", + "destination_pod_name": "tcp-echo", + "destination_workload": "tcp-echo", + "destination_canonical_service": "tcp-echo", + "destination_canonical_revision": "v1", + "destination_workload_namespace": "ambient-demo", + "destination_principal": "spiffe://cluster.local/ns/ambient-demo/sa/default", + "destination_app": "tcp-echo", + "destination_version": "v1", + "destination_cluster": "Kubernetes", + "request_protocol": "tcp", + "response_flags": "-", + "connection_security_policy": "mutual_tls", + } + + serviceLabels := map[string]string{ + "reporter": "destination", + "source_workload": "sleep", + "source_canonical_service": "sleep", + "source_canonical_revision": "latest", + "source_workload_namespace": "ambient-demo", + "source_principal": "spiffe://cluster.local/ns/ambient-demo/sa/sleep", + "source_app": "sleep", + "source_version": "latest", + "source_cluster": "Kubernetes", + "destination_service": "sleep.ambient.svc.cluster.local", "destination_service_namespace": "ambient-demo", "destination_service_name": "tcp-echo", "destination_workload": "tcp-echo", @@ -70,10 +100,15 @@ func TestRegisterMetrics(t *testing.T) { "connection_security_policy": "mutual_tls", } - tcpConnectionClosed.With(testlabels).Set(2) - tcpConnectionOpened.With(testlabels).Set(4) - tcpReceivedBytes.With(testlabels).Set(12.64) - tcpSentBytes.With(testlabels).Set(11.45) + tcpConnectionClosedInWorkload.With(workloadLabels).Set(2) + tcpConnectionOpenedInWorkload.With(workloadLabels).Set(4) + tcpReceivedBytesInWorkload.With(workloadLabels).Set(12.64) + tcpSentBytesInWorkload.With(workloadLabels).Set(11.45) + + tcpConnectionClosedInService.With(serviceLabels).Set(4) + tcpReceivedBytesInService.With(serviceLabels).Set(8) + tcpSentBytesInService.With(serviceLabels).Set(9) + tcpConnectionOpenedInService.With(serviceLabels).Set(16.25) for _, metric := range exportMetrics { if err := prometheus.Register(metric); err != nil {