Merge pull request #24360 from freehan/svcflakefix1

Automatic merge from submit-queue

only close new ports upon iptables-restore failure

fixes: #24357
This commit is contained in:
k8s-merge-robot 2016-04-27 04:42:13 -07:00
commit 8dfb6ebc31
2 changed files with 121 additions and 12 deletions

View File

@ -767,8 +767,8 @@ func (proxier *Proxier) syncProxyRules() {
// Accumulate NAT chains to keep. // Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate new local ports that we have opened. // Accumulate the set of local ports that we will be holding open once this update is complete
newLocalPorts := map[localPort]closeable{} replacementPortsMap := map[localPort]closeable{}
// Build rules for each service. // Build rules for each service.
for svcName, svcInfo := range proxier.serviceMap { for svcName, svcInfo := range proxier.serviceMap {
@ -814,14 +814,15 @@ func (proxier *Proxier) syncProxyRules() {
protocol: protocol, protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
newLocalPorts[lp] = proxier.portsMap[lp] glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else { } else {
socket, err := openLocalPort(&lp) socket, err := openLocalPort(&lp)
if err != nil { if err != nil {
glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err) glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err)
continue continue
} }
newLocalPorts[lp] = socket replacementPortsMap[lp] = socket
} }
} // We're holding the port, so it's OK to install iptables rules. } // We're holding the port, so it's OK to install iptables rules.
args := []string{ args := []string{
@ -877,14 +878,15 @@ func (proxier *Proxier) syncProxyRules() {
protocol: protocol, protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
newLocalPorts[lp] = proxier.portsMap[lp] glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else { } else {
socket, err := openLocalPort(&lp) socket, err := openLocalPort(&lp)
if err != nil { if err != nil {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue continue
} }
newLocalPorts[lp] = socket replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install iptables rules. } // We're holding the port, so it's OK to install iptables rules.
args := []string{ args := []string{
@ -1024,20 +1026,17 @@ func (proxier *Proxier) syncProxyRules() {
if err != nil { if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v", err) glog.Errorf("Failed to execute iptables-restore: %v", err)
// Revert new local ports. // Revert new local ports.
for k, v := range newLocalPorts { revertPorts(replacementPortsMap, proxier.portsMap)
glog.Errorf("Closing local port %s", k.String())
v.Close()
}
return return
} }
// Close old local ports and save new ones. // Close old local ports and save new ones.
for k, v := range proxier.portsMap { for k, v := range proxier.portsMap {
if newLocalPorts[k] == nil { if replacementPortsMap[k] == nil {
v.Close() v.Close()
} }
} }
proxier.portsMap = newLocalPorts proxier.portsMap = replacementPortsMap
// Clean up the older SNAT rule which was directly in POSTROUTING. // Clean up the older SNAT rule which was directly in POSTROUTING.
// TODO(thockin): Remove this for v1.3 or v1.4. // TODO(thockin): Remove this for v1.3 or v1.4.
@ -1198,3 +1197,15 @@ func openLocalPort(lp *localPort) (closeable, error) {
glog.V(2).Infof("Opened local port %s", lp.String()) glog.V(2).Infof("Opened local port %s", lp.String())
return socket, nil return socket, nil
} }
// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) {
for k, v := range replacementPortsMap {
// Only close newly opened local ports - leave ones that were open before this update
if originalPortsMap[k] == nil {
glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String())
v.Close()
}
}
}

View File

@ -374,4 +374,102 @@ func TestDeleteServiceConnections(t *testing.T) {
} }
} }
type fakeClosable struct {
closed bool
}
func (c *fakeClosable) Close() error {
c.closed = true
return nil
}
func TestRevertPorts(t *testing.T) {
testCases := []struct {
replacementPorts []localPort
existingPorts []localPort
expectToBeClose []bool
}{
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{},
expectToBeClose: []bool{true, true, true},
},
{
replacementPorts: []localPort{},
existingPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
expectToBeClose: []bool{},
},
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
expectToBeClose: []bool{false, false, false},
},
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{
{port: 5001},
{port: 5003},
},
expectToBeClose: []bool{false, true, false},
},
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
{port: 5004},
},
expectToBeClose: []bool{false, false, false},
},
}
for i, tc := range testCases {
replacementPortsMap := make(map[localPort]closeable)
for _, lp := range tc.replacementPorts {
replacementPortsMap[lp] = &fakeClosable{}
}
existingPortsMap := make(map[localPort]closeable)
for _, lp := range tc.existingPorts {
existingPortsMap[lp] = &fakeClosable{}
}
revertPorts(replacementPortsMap, existingPortsMap)
for j, expectation := range tc.expectToBeClose {
if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation {
t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i)
}
}
for _, lp := range tc.existingPorts {
if existingPortsMap[lp].(*fakeClosable).closed == true {
t.Errorf("Expect existing localport %v to be false in test case %v", lp, i)
}
}
}
}
// TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces. // TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.