Merge pull request #105744 from jsturtevant/windows-containerd-networkstats

Get Windows network stats directly for Containerd
This commit is contained in:
Kubernetes Prow Robot 2021-11-12 12:36:41 -08:00 committed by GitHub
commit 1f6aa87a93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 495 additions and 67 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/clock"
) )
var ( var (
@ -67,6 +68,11 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService imageService internalapi.ImageManagerService
// hostStatsProvider is used to get the status of the host filesystem consumed by pods. // hostStatsProvider is used to get the status of the host filesystem consumed by pods.
hostStatsProvider HostStatsProvider hostStatsProvider HostStatsProvider
//lint:ignore U1000 We can't import hcsshim due to Build constraints in hcsshim
// windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows
windowsNetworkStatsProvider interface{}
// clock is used report current time
clock clock.Clock
// cpuUsageCache caches the cpu usage for containers. // cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*cpuUsageRecord cpuUsageCache map[string]*cpuUsageRecord
@ -95,6 +101,7 @@ func newCRIStatsProvider(
cpuUsageCache: make(map[string]*cpuUsageRecord), cpuUsageCache: make(map[string]*cpuUsageRecord),
disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics, disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics,
podAndContainerStatsFromCRI: podAndContainerStatsFromCRI, podAndContainerStatsFromCRI: podAndContainerStatsFromCRI,
clock: clock.RealClock{},
} }
} }

View File

@ -20,104 +20,98 @@ limitations under the License.
package stats package stats
import ( import (
"fmt"
"time" "time"
"github.com/Microsoft/hcsshim"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"github.com/Microsoft/hcsshim"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
) )
// windowsNetworkStatsProvider creates an interface that allows for testing the logic without needing to create a container
type windowsNetworkStatsProvider interface {
HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error)
GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error)
}
// networkStats exposes the required functionality for hcsshim in this scenario
type networkStats struct{}
func (s networkStats) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) {
return hcsshim.HNSListEndpointRequest()
}
func (s networkStats) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) {
return hcsshim.GetHNSEndpointStats(endpointName)
}
// listContainerNetworkStats returns the network stats of all the running containers. // listContainerNetworkStats returns the network stats of all the running containers.
func (p *criStatsProvider) listContainerNetworkStats() (map[string]*statsapi.NetworkStats, error) { func (p *criStatsProvider) listContainerNetworkStats() (map[string]*statsapi.NetworkStats, error) {
containers, err := hcsshim.GetContainers(hcsshim.ComputeSystemQuery{ networkStatsProvider := newNetworkStatsProvider(p)
Types: []string{"Container"},
}) endpoints, err := networkStatsProvider.HNSListEndpointRequest()
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to fetch current HNS endpoints")
return nil, err return nil, err
} }
stats := make(map[string]*statsapi.NetworkStats) networkStats := make(map[string]*statsapi.NetworkStats)
for _, c := range containers { for _, endpoint := range endpoints {
cstats, err := fetchContainerStats(c) endpointStats, err := networkStatsProvider.GetHNSEndpointStats(endpoint.Id)
if err != nil { if err != nil {
klog.V(4).InfoS("Failed to fetch statistics for container, continue to get stats for other containers", "containerID", c.ID, "err", err) klog.V(2).InfoS("Failed to fetch statistics for endpoint, continue to get stats for other endpoints", "endpointId", endpoint.Id, "containers", endpoint.SharedContainers)
continue continue
} }
if len(cstats.Network) > 0 {
stats[c.ID] = hcsStatsToNetworkStats(cstats.Timestamp, cstats.Network)
}
}
return stats, nil // only add the interface for each container if not already in the list
} for _, cId := range endpoint.SharedContainers {
networkStat, found := networkStats[cId]
func fetchContainerStats(c hcsshim.ContainerProperties) (stats hcsshim.Statistics, err error) { if found && networkStat.Name != endpoint.Name {
var ( iStat := hcsStatToInterfaceStat(endpointStats, endpoint.Name)
container hcsshim.Container networkStat.Interfaces = append(networkStat.Interfaces, iStat)
) continue
container, err = hcsshim.OpenContainer(c.ID)
if err != nil {
return
}
defer func() {
if closeErr := container.Close(); closeErr != nil {
if err != nil {
err = fmt.Errorf("failed to close container after error %v; close error: %v", err, closeErr)
} else {
err = closeErr
} }
networkStats[cId] = hcsStatsToNetworkStats(p.clock.Now(), endpointStats, endpoint.Name)
} }
}() }
return container.Statistics() return networkStats, nil
} }
// hcsStatsToNetworkStats converts hcsshim.Statistics.Network to statsapi.NetworkStats // hcsStatsToNetworkStats converts hcsshim.Statistics.Network to statsapi.NetworkStats
func hcsStatsToNetworkStats(timestamp time.Time, hcsStats []hcsshim.NetworkStats) *statsapi.NetworkStats { func hcsStatsToNetworkStats(timestamp time.Time, hcsStats *hcsshim.HNSEndpointStats, endpointName string) *statsapi.NetworkStats {
result := &statsapi.NetworkStats{ result := &statsapi.NetworkStats{
Time: metav1.NewTime(timestamp), Time: metav1.NewTime(timestamp),
Interfaces: make([]statsapi.InterfaceStats, 0), Interfaces: make([]statsapi.InterfaceStats, 0),
} }
adapters := sets.NewString() iStat := hcsStatToInterfaceStat(hcsStats, endpointName)
for _, stat := range hcsStats {
iStat, err := hcsStatsToInterfaceStats(stat)
if err != nil {
klog.InfoS("Failed to get HNS endpoint, continue to get stats for other endpoints", "endpointID", stat.EndpointId, "err", err)
continue
}
// Only count each adapter once. // TODO: add support of multiple interfaces for getting default interface.
if adapters.Has(iStat.Name) { result.Interfaces = append(result.Interfaces, iStat)
continue result.InterfaceStats = iStat
}
result.Interfaces = append(result.Interfaces, *iStat)
adapters.Insert(iStat.Name)
}
// TODO(feiskyer): add support of multiple interfaces for getting default interface.
if len(result.Interfaces) > 0 {
result.InterfaceStats = result.Interfaces[0]
}
return result return result
} }
// hcsStatsToInterfaceStats converts hcsshim.NetworkStats to statsapi.InterfaceStats. func hcsStatToInterfaceStat(hcsStats *hcsshim.HNSEndpointStats, endpointName string) statsapi.InterfaceStats {
func hcsStatsToInterfaceStats(stat hcsshim.NetworkStats) (*statsapi.InterfaceStats, error) { iStat := statsapi.InterfaceStats{
endpoint, err := hcsshim.GetHNSEndpointByID(stat.EndpointId) Name: endpointName,
if err != nil { RxBytes: &hcsStats.BytesReceived,
return nil, err TxBytes: &hcsStats.BytesSent,
} }
return iStat
return &statsapi.InterfaceStats{ }
Name: endpoint.Name,
RxBytes: &stat.BytesReceived, // newNetworkStatsProvider uses the real windows hcsshim if not provided otherwise if the interface is provided
TxBytes: &stat.BytesSent, // by the cristatsprovider in testing scenarios it uses that one
}, nil func newNetworkStatsProvider(p *criStatsProvider) windowsNetworkStatsProvider {
var statsProvider windowsNetworkStatsProvider
if p.windowsNetworkStatsProvider == nil {
statsProvider = networkStats{}
} else {
statsProvider = p.windowsNetworkStatsProvider.(windowsNetworkStatsProvider)
}
return statsProvider
} }

View File

@ -0,0 +1,422 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"reflect"
"testing"
"time"
"github.com/Microsoft/hcsshim"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
testingclock "k8s.io/utils/clock/testing"
)
type fakeNetworkStatsProvider struct {
containers []containerStats
}
type containerStats struct {
container hcsshim.ContainerProperties
hcsStats []hcsshim.NetworkStats
}
func (s fakeNetworkStatsProvider) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) {
eps := hcsshim.HNSEndpointStats{}
for _, c := range s.containers {
for _, stat := range c.hcsStats {
if endpointName == stat.InstanceId {
eps = hcsshim.HNSEndpointStats{
EndpointID: stat.EndpointId,
BytesSent: stat.BytesSent,
BytesReceived: stat.BytesReceived,
PacketsReceived: stat.PacketsReceived,
PacketsSent: stat.PacketsSent,
}
}
}
}
return &eps, nil
}
func (s fakeNetworkStatsProvider) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) {
uniqueEndpoints := map[string]*hcsshim.HNSEndpoint{}
for _, c := range s.containers {
for _, stat := range c.hcsStats {
e, found := uniqueEndpoints[stat.EndpointId]
if found {
// add the container
e.SharedContainers = append(e.SharedContainers, c.container.ID)
continue
}
uniqueEndpoints[stat.EndpointId] = &hcsshim.HNSEndpoint{
Name: stat.EndpointId,
Id: stat.EndpointId,
SharedContainers: []string{c.container.ID},
}
}
}
eps := []hcsshim.HNSEndpoint{}
for _, ep := range uniqueEndpoints {
eps = append(eps, *ep)
}
return eps, nil
}
func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Time{})
tests := []struct {
name string
fields fakeNetworkStatsProvider
want map[string]*statsapi.NetworkStats
wantErr bool
}{
{
name: "basic example",
fields: fakeNetworkStatsProvider{
containers: []containerStats{
{
container: hcsshim.ContainerProperties{
ID: "c1",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 1,
BytesSent: 10,
EndpointId: "test",
InstanceId: "test",
},
},
},
{
container: hcsshim.ContainerProperties{
ID: "c2",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 2,
BytesSent: 20,
EndpointId: "test2",
InstanceId: "test2",
},
},
},
},
},
want: map[string]*statsapi.NetworkStats{
"c1": {
Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
},
},
"c2": {
Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
},
},
},
wantErr: false,
},
{
name: "multiple containers same endpoint",
fields: fakeNetworkStatsProvider{
containers: []containerStats{
{
container: hcsshim.ContainerProperties{
ID: "c1",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 1,
BytesSent: 10,
EndpointId: "test",
InstanceId: "test",
},
},
},
{
container: hcsshim.ContainerProperties{
ID: "c2",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 2,
BytesSent: 20,
EndpointId: "test2",
InstanceId: "test2",
},
},
},
{
container: hcsshim.ContainerProperties{
ID: "c3",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 3,
BytesSent: 30,
EndpointId: "test2",
InstanceId: "test3",
},
},
},
},
},
want: map[string]*statsapi.NetworkStats{
"c1": {
Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
},
},
"c2": {
Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
},
},
"c3": {
Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
},
},
},
wantErr: false,
},
{
name: "multiple stats instances of same interface only picks up first",
fields: fakeNetworkStatsProvider{
containers: []containerStats{
{
container: hcsshim.ContainerProperties{
ID: "c1",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 1,
BytesSent: 10,
EndpointId: "test",
InstanceId: "test",
},
{
BytesReceived: 3,
BytesSent: 30,
EndpointId: "test",
InstanceId: "test3",
},
},
},
{
container: hcsshim.ContainerProperties{
ID: "c2",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 2,
BytesSent: 20,
EndpointId: "test2",
InstanceId: "test2",
},
},
},
},
},
want: map[string]*statsapi.NetworkStats{
"c1": {
Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
},
},
"c2": {
Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
},
},
},
wantErr: false,
},
{
name: "multiple endpoints per container",
fields: fakeNetworkStatsProvider{
containers: []containerStats{
{
container: hcsshim.ContainerProperties{
ID: "c1",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 1,
BytesSent: 10,
EndpointId: "test",
InstanceId: "test",
},
{
BytesReceived: 3,
BytesSent: 30,
EndpointId: "test3",
InstanceId: "test3",
},
},
},
{
container: hcsshim.ContainerProperties{
ID: "c2",
}, hcsStats: []hcsshim.NetworkStats{
{
BytesReceived: 2,
BytesSent: 20,
EndpointId: "test2",
InstanceId: "test2",
},
},
},
},
},
want: map[string]*statsapi.NetworkStats{
"c1": {
Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test",
RxBytes: toP(1),
TxBytes: toP(10),
},
{
Name: "test3",
RxBytes: toP(3),
TxBytes: toP(30),
},
},
},
"c2": {
Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
Interfaces: []statsapi.InterfaceStats{
{
Name: "test2",
RxBytes: toP(2),
TxBytes: toP(20),
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &criStatsProvider{
windowsNetworkStatsProvider: fakeNetworkStatsProvider{
containers: tt.fields.containers,
},
clock: fakeClock,
}
got, err := p.listContainerNetworkStats()
if (err != nil) != tt.wantErr {
t.Errorf("listContainerNetworkStats() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("listContainerNetworkStats() got = %v, want %v", got, tt.want)
}
})
}
}
func toP(i uint64) *uint64 {
return &i
}

View File

@ -80,6 +80,11 @@ var _ = SIGDescribe("[Feature:Windows] Kubelet-Stats [Serial]", func() {
framework.ExpectEqual(*podStats.CPU.UsageCoreNanoSeconds > 0, true, "Pod stats should not report 0 cpu usage") framework.ExpectEqual(*podStats.CPU.UsageCoreNanoSeconds > 0, true, "Pod stats should not report 0 cpu usage")
framework.ExpectEqual(*podStats.Memory.WorkingSetBytes > 0, true, "Pod stats should not report 0 bytes for memory working set ") framework.ExpectEqual(*podStats.Memory.WorkingSetBytes > 0, true, "Pod stats should not report 0 bytes for memory working set ")
framework.ExpectEqual(podStats.Network != nil, true, "Pod stats should report network stats")
framework.ExpectEqual(podStats.Network.Name != "", true, "Pod stats should report network name")
framework.ExpectEqual(*podStats.Network.TxBytes > 0, true, "Pod stats should report network Tx stats")
framework.ExpectEqual(len(podStats.Network.Interfaces) > 0, true, "Pod Stats should report individual interfaces stats")
for _, containerStats := range podStats.Containers { for _, containerStats := range podStats.Containers {
framework.ExpectEqual(containerStats.Logs != nil, true, "Pod stats should have container log stats") framework.ExpectEqual(containerStats.Logs != nil, true, "Pod stats should have container log stats")
framework.ExpectEqual(*containerStats.Logs.AvailableBytes > 0, true, "container log stats should not report 0 bytes for AvailableBytes") framework.ExpectEqual(*containerStats.Logs.AvailableBytes > 0, true, "container log stats should not report 0 bytes for AvailableBytes")