diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index 50f182e73b9..10fe4d55b33 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/server/stats" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/utils/clock" ) var ( @@ -67,6 +68,11 @@ type criStatsProvider struct { imageService internalapi.ImageManagerService // hostStatsProvider is used to get the status of the host filesystem consumed by pods. hostStatsProvider HostStatsProvider + //lint:ignore U1000 We can't import hcsshim due to Build constraints in hcsshim + // windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows + windowsNetworkStatsProvider interface{} + // clock is used report current time + clock clock.Clock // cpuUsageCache caches the cpu usage for containers. cpuUsageCache map[string]*cpuUsageRecord @@ -95,6 +101,7 @@ func newCRIStatsProvider( cpuUsageCache: make(map[string]*cpuUsageRecord), disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics, podAndContainerStatsFromCRI: podAndContainerStatsFromCRI, + clock: clock.RealClock{}, } } diff --git a/pkg/kubelet/stats/cri_stats_provider_windows.go b/pkg/kubelet/stats/cri_stats_provider_windows.go index 68b2609e694..a32ffe9e981 100644 --- a/pkg/kubelet/stats/cri_stats_provider_windows.go +++ b/pkg/kubelet/stats/cri_stats_provider_windows.go @@ -20,104 +20,98 @@ limitations under the License. package stats import ( - "fmt" "time" - "github.com/Microsoft/hcsshim" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + + "github.com/Microsoft/hcsshim" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" ) +// windowsNetworkStatsProvider creates an interface that allows for testing the logic without needing to create a container +type windowsNetworkStatsProvider interface { + HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) + GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) +} + +// networkStats exposes the required functionality for hcsshim in this scenario +type networkStats struct{} + +func (s networkStats) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) { + return hcsshim.HNSListEndpointRequest() +} + +func (s networkStats) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) { + return hcsshim.GetHNSEndpointStats(endpointName) +} + // listContainerNetworkStats returns the network stats of all the running containers. func (p *criStatsProvider) listContainerNetworkStats() (map[string]*statsapi.NetworkStats, error) { - containers, err := hcsshim.GetContainers(hcsshim.ComputeSystemQuery{ - Types: []string{"Container"}, - }) + networkStatsProvider := newNetworkStatsProvider(p) + + endpoints, err := networkStatsProvider.HNSListEndpointRequest() if err != nil { + klog.ErrorS(err, "Failed to fetch current HNS endpoints") return nil, err } - stats := make(map[string]*statsapi.NetworkStats) - for _, c := range containers { - cstats, err := fetchContainerStats(c) + networkStats := make(map[string]*statsapi.NetworkStats) + for _, endpoint := range endpoints { + endpointStats, err := networkStatsProvider.GetHNSEndpointStats(endpoint.Id) if err != nil { - klog.V(4).InfoS("Failed to fetch statistics for container, continue to get stats for other containers", "containerID", c.ID, "err", err) + klog.V(2).InfoS("Failed to fetch statistics for endpoint, continue to get stats for other endpoints", "endpointId", endpoint.Id, "containers", endpoint.SharedContainers) continue } - if len(cstats.Network) > 0 { - stats[c.ID] = hcsStatsToNetworkStats(cstats.Timestamp, cstats.Network) - } - } - return stats, nil -} - -func fetchContainerStats(c hcsshim.ContainerProperties) (stats hcsshim.Statistics, err error) { - var ( - container hcsshim.Container - ) - container, err = hcsshim.OpenContainer(c.ID) - if err != nil { - return - } - defer func() { - if closeErr := container.Close(); closeErr != nil { - if err != nil { - err = fmt.Errorf("failed to close container after error %v; close error: %v", err, closeErr) - } else { - err = closeErr + // only add the interface for each container if not already in the list + for _, cId := range endpoint.SharedContainers { + networkStat, found := networkStats[cId] + if found && networkStat.Name != endpoint.Name { + iStat := hcsStatToInterfaceStat(endpointStats, endpoint.Name) + networkStat.Interfaces = append(networkStat.Interfaces, iStat) + continue } + networkStats[cId] = hcsStatsToNetworkStats(p.clock.Now(), endpointStats, endpoint.Name) } - }() + } - return container.Statistics() + return networkStats, nil } // hcsStatsToNetworkStats converts hcsshim.Statistics.Network to statsapi.NetworkStats -func hcsStatsToNetworkStats(timestamp time.Time, hcsStats []hcsshim.NetworkStats) *statsapi.NetworkStats { +func hcsStatsToNetworkStats(timestamp time.Time, hcsStats *hcsshim.HNSEndpointStats, endpointName string) *statsapi.NetworkStats { result := &statsapi.NetworkStats{ Time: metav1.NewTime(timestamp), Interfaces: make([]statsapi.InterfaceStats, 0), } - adapters := sets.NewString() - for _, stat := range hcsStats { - iStat, err := hcsStatsToInterfaceStats(stat) - if err != nil { - klog.InfoS("Failed to get HNS endpoint, continue to get stats for other endpoints", "endpointID", stat.EndpointId, "err", err) - continue - } + iStat := hcsStatToInterfaceStat(hcsStats, endpointName) - // Only count each adapter once. - if adapters.Has(iStat.Name) { - continue - } - - result.Interfaces = append(result.Interfaces, *iStat) - adapters.Insert(iStat.Name) - } - - // TODO(feiskyer): add support of multiple interfaces for getting default interface. - if len(result.Interfaces) > 0 { - result.InterfaceStats = result.Interfaces[0] - } + // TODO: add support of multiple interfaces for getting default interface. + result.Interfaces = append(result.Interfaces, iStat) + result.InterfaceStats = iStat return result } -// hcsStatsToInterfaceStats converts hcsshim.NetworkStats to statsapi.InterfaceStats. -func hcsStatsToInterfaceStats(stat hcsshim.NetworkStats) (*statsapi.InterfaceStats, error) { - endpoint, err := hcsshim.GetHNSEndpointByID(stat.EndpointId) - if err != nil { - return nil, err +func hcsStatToInterfaceStat(hcsStats *hcsshim.HNSEndpointStats, endpointName string) statsapi.InterfaceStats { + iStat := statsapi.InterfaceStats{ + Name: endpointName, + RxBytes: &hcsStats.BytesReceived, + TxBytes: &hcsStats.BytesSent, } - - return &statsapi.InterfaceStats{ - Name: endpoint.Name, - RxBytes: &stat.BytesReceived, - TxBytes: &stat.BytesSent, - }, nil + return iStat +} + +// newNetworkStatsProvider uses the real windows hcsshim if not provided otherwise if the interface is provided +// by the cristatsprovider in testing scenarios it uses that one +func newNetworkStatsProvider(p *criStatsProvider) windowsNetworkStatsProvider { + var statsProvider windowsNetworkStatsProvider + if p.windowsNetworkStatsProvider == nil { + statsProvider = networkStats{} + } else { + statsProvider = p.windowsNetworkStatsProvider.(windowsNetworkStatsProvider) + } + return statsProvider } diff --git a/pkg/kubelet/stats/cri_stats_provider_windows_test.go b/pkg/kubelet/stats/cri_stats_provider_windows_test.go new file mode 100644 index 00000000000..6bce71c3bb8 --- /dev/null +++ b/pkg/kubelet/stats/cri_stats_provider_windows_test.go @@ -0,0 +1,422 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import ( + "reflect" + "testing" + "time" + + "github.com/Microsoft/hcsshim" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + testingclock "k8s.io/utils/clock/testing" +) + +type fakeNetworkStatsProvider struct { + containers []containerStats +} + +type containerStats struct { + container hcsshim.ContainerProperties + hcsStats []hcsshim.NetworkStats +} + +func (s fakeNetworkStatsProvider) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) { + eps := hcsshim.HNSEndpointStats{} + for _, c := range s.containers { + for _, stat := range c.hcsStats { + if endpointName == stat.InstanceId { + eps = hcsshim.HNSEndpointStats{ + EndpointID: stat.EndpointId, + BytesSent: stat.BytesSent, + BytesReceived: stat.BytesReceived, + PacketsReceived: stat.PacketsReceived, + PacketsSent: stat.PacketsSent, + } + } + } + } + + return &eps, nil +} + +func (s fakeNetworkStatsProvider) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) { + uniqueEndpoints := map[string]*hcsshim.HNSEndpoint{} + + for _, c := range s.containers { + for _, stat := range c.hcsStats { + e, found := uniqueEndpoints[stat.EndpointId] + if found { + // add the container + e.SharedContainers = append(e.SharedContainers, c.container.ID) + continue + } + + uniqueEndpoints[stat.EndpointId] = &hcsshim.HNSEndpoint{ + Name: stat.EndpointId, + Id: stat.EndpointId, + SharedContainers: []string{c.container.ID}, + } + } + } + + eps := []hcsshim.HNSEndpoint{} + for _, ep := range uniqueEndpoints { + eps = append(eps, *ep) + } + + return eps, nil +} + +func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Time{}) + tests := []struct { + name string + fields fakeNetworkStatsProvider + want map[string]*statsapi.NetworkStats + wantErr bool + }{ + { + name: "basic example", + fields: fakeNetworkStatsProvider{ + containers: []containerStats{ + { + container: hcsshim.ContainerProperties{ + ID: "c1", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", + }, + }, + }, + { + container: hcsshim.ContainerProperties{ + ID: "c2", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, + }, + }, + }, + want: map[string]*statsapi.NetworkStats{ + "c1": { + Time: v1.NewTime(fakeClock.Now()), + InterfaceStats: statsapi.InterfaceStats{ + Name: "test", + RxBytes: toP(1), + TxBytes: toP(10), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test", + RxBytes: toP(1), + + TxBytes: toP(10), + }, + }, + }, + "c2": { + Time: v1.Time{}, + InterfaceStats: statsapi.InterfaceStats{ + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + }, + }, + }, + wantErr: false, + }, + { + name: "multiple containers same endpoint", + fields: fakeNetworkStatsProvider{ + containers: []containerStats{ + { + container: hcsshim.ContainerProperties{ + ID: "c1", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", + }, + }, + }, + { + container: hcsshim.ContainerProperties{ + ID: "c2", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, + }, + { + container: hcsshim.ContainerProperties{ + ID: "c3", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 3, + BytesSent: 30, + EndpointId: "test2", + InstanceId: "test3", + }, + }, + }, + }, + }, + want: map[string]*statsapi.NetworkStats{ + "c1": { + Time: v1.NewTime(fakeClock.Now()), + InterfaceStats: statsapi.InterfaceStats{ + Name: "test", + RxBytes: toP(1), + TxBytes: toP(10), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test", + RxBytes: toP(1), + + TxBytes: toP(10), + }, + }, + }, + "c2": { + Time: v1.Time{}, + InterfaceStats: statsapi.InterfaceStats{ + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + }, + }, + "c3": { + Time: v1.Time{}, + InterfaceStats: statsapi.InterfaceStats{ + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + }, + }, + }, + wantErr: false, + }, + { + name: "multiple stats instances of same interface only picks up first", + fields: fakeNetworkStatsProvider{ + containers: []containerStats{ + { + container: hcsshim.ContainerProperties{ + ID: "c1", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", + }, + { + BytesReceived: 3, + BytesSent: 30, + EndpointId: "test", + InstanceId: "test3", + }, + }, + }, + { + container: hcsshim.ContainerProperties{ + ID: "c2", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, + }, + }, + }, + want: map[string]*statsapi.NetworkStats{ + "c1": { + Time: v1.NewTime(fakeClock.Now()), + InterfaceStats: statsapi.InterfaceStats{ + Name: "test", + RxBytes: toP(1), + TxBytes: toP(10), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test", + RxBytes: toP(1), + + TxBytes: toP(10), + }, + }, + }, + "c2": { + Time: v1.Time{}, + InterfaceStats: statsapi.InterfaceStats{ + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + }, + }, + }, + wantErr: false, + }, + { + name: "multiple endpoints per container", + fields: fakeNetworkStatsProvider{ + containers: []containerStats{ + { + container: hcsshim.ContainerProperties{ + ID: "c1", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", + }, + { + BytesReceived: 3, + BytesSent: 30, + EndpointId: "test3", + InstanceId: "test3", + }, + }, + }, + { + container: hcsshim.ContainerProperties{ + ID: "c2", + }, hcsStats: []hcsshim.NetworkStats{ + { + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, + }, + }, + }, + want: map[string]*statsapi.NetworkStats{ + "c1": { + Time: v1.NewTime(fakeClock.Now()), + InterfaceStats: statsapi.InterfaceStats{ + Name: "test", + RxBytes: toP(1), + TxBytes: toP(10), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test", + RxBytes: toP(1), + + TxBytes: toP(10), + }, + { + Name: "test3", + RxBytes: toP(3), + + TxBytes: toP(30), + }, + }, + }, + "c2": { + Time: v1.Time{}, + InterfaceStats: statsapi.InterfaceStats{ + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + Interfaces: []statsapi.InterfaceStats{ + { + Name: "test2", + RxBytes: toP(2), + TxBytes: toP(20), + }, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &criStatsProvider{ + windowsNetworkStatsProvider: fakeNetworkStatsProvider{ + containers: tt.fields.containers, + }, + clock: fakeClock, + } + got, err := p.listContainerNetworkStats() + if (err != nil) != tt.wantErr { + t.Errorf("listContainerNetworkStats() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("listContainerNetworkStats() got = %v, want %v", got, tt.want) + } + }) + } +} + +func toP(i uint64) *uint64 { + return &i +} diff --git a/test/e2e/windows/kubelet_stats.go b/test/e2e/windows/kubelet_stats.go index 1bbb506a26c..6e614db17b6 100644 --- a/test/e2e/windows/kubelet_stats.go +++ b/test/e2e/windows/kubelet_stats.go @@ -80,6 +80,11 @@ var _ = SIGDescribe("[Feature:Windows] Kubelet-Stats [Serial]", func() { framework.ExpectEqual(*podStats.CPU.UsageCoreNanoSeconds > 0, true, "Pod stats should not report 0 cpu usage") framework.ExpectEqual(*podStats.Memory.WorkingSetBytes > 0, true, "Pod stats should not report 0 bytes for memory working set ") + framework.ExpectEqual(podStats.Network != nil, true, "Pod stats should report network stats") + framework.ExpectEqual(podStats.Network.Name != "", true, "Pod stats should report network name") + framework.ExpectEqual(*podStats.Network.TxBytes > 0, true, "Pod stats should report network Tx stats") + framework.ExpectEqual(len(podStats.Network.Interfaces) > 0, true, "Pod Stats should report individual interfaces stats") + for _, containerStats := range podStats.Containers { framework.ExpectEqual(containerStats.Logs != nil, true, "Pod stats should have container log stats") framework.ExpectEqual(*containerStats.Logs.AvailableBytes > 0, true, "container log stats should not report 0 bytes for AvailableBytes")