mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #72361 from DataDog/lbernail/netlink-mutex
[kube-proxy/ipvs] Protect Netlink calls with a mutex
This commit is contained in:
commit
bb7973a34c
@ -23,6 +23,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
libipvs "github.com/docker/libnetwork/ipvs"
|
libipvs "github.com/docker/libnetwork/ipvs"
|
||||||
@ -34,6 +35,7 @@ import (
|
|||||||
type runner struct {
|
type runner struct {
|
||||||
exec utilexec.Interface
|
exec utilexec.Interface
|
||||||
ipvsHandle *libipvs.Handle
|
ipvsHandle *libipvs.Handle
|
||||||
|
mu sync.Mutex // Protect Netlink calls
|
||||||
}
|
}
|
||||||
|
|
||||||
// Protocol is the IPVS service protocol type
|
// Protocol is the IPVS service protocol type
|
||||||
@ -58,6 +60,8 @@ func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.NewService(svc)
|
return runner.ipvsHandle.NewService(svc)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,6 +71,8 @@ func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.UpdateService(svc)
|
return runner.ipvsHandle.UpdateService(svc)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,6 +82,8 @@ func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.DelService(svc)
|
return runner.ipvsHandle.DelService(svc)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,7 +93,10 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
ipvsSvc, err := runner.ipvsHandle.GetService(svc)
|
ipvsSvc, err := runner.ipvsHandle.GetService(svc)
|
||||||
|
runner.mu.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -98,7 +109,9 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error
|
|||||||
|
|
||||||
// GetVirtualServers is part of ipvs.Interface.
|
// GetVirtualServers is part of ipvs.Interface.
|
||||||
func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
|
func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
|
||||||
|
runner.mu.Lock()
|
||||||
ipvsSvcs, err := runner.ipvsHandle.GetServices()
|
ipvsSvcs, err := runner.ipvsHandle.GetServices()
|
||||||
|
runner.mu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -115,6 +128,8 @@ func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
|
|||||||
|
|
||||||
// Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
|
// Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
|
||||||
func (runner *runner) Flush() error {
|
func (runner *runner) Flush() error {
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.Flush()
|
return runner.ipvsHandle.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,6 +143,8 @@ func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.NewDestination(svc, dst)
|
return runner.ipvsHandle.NewDestination(svc, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,6 +158,8 @@ func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.DelDestination(svc, dst)
|
return runner.ipvsHandle.DelDestination(svc, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,6 +172,8 @@ func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.UpdateDestination(svc, dst)
|
return runner.ipvsHandle.UpdateDestination(svc, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,7 +183,9 @@ func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
dsts, err := runner.ipvsHandle.GetDestinations(svc)
|
dsts, err := runner.ipvsHandle.GetDestinations(svc)
|
||||||
|
runner.mu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user