diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b97a29ed8c7..5c7bd230082 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -767,8 +767,8 @@ func (proxier *Proxier) syncProxyRules() { // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - // Accumulate new local ports that we have opened. - newLocalPorts := map[localPort]closeable{} + // Accumulate the set of local ports that we will be holding open once this update is complete + replacementPortsMap := map[localPort]closeable{} // Build rules for each service. for svcName, svcInfo := range proxier.serviceMap { @@ -814,14 +814,15 @@ func (proxier *Proxier) syncProxyRules() { protocol: protocol, } if proxier.portsMap[lp] != nil { - newLocalPorts[lp] = proxier.portsMap[lp] + glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := openLocalPort(&lp) if err != nil { glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err) continue } - newLocalPorts[lp] = socket + replacementPortsMap[lp] = socket } } // We're holding the port, so it's OK to install iptables rules. args := []string{ @@ -877,14 +878,15 @@ func (proxier *Proxier) syncProxyRules() { protocol: protocol, } if proxier.portsMap[lp] != nil { - newLocalPorts[lp] = proxier.portsMap[lp] + glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := openLocalPort(&lp) if err != nil { glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } - newLocalPorts[lp] = socket + replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install iptables rules. args := []string{ @@ -1024,20 +1026,17 @@ func (proxier *Proxier) syncProxyRules() { if err != nil { glog.Errorf("Failed to execute iptables-restore: %v", err) // Revert new local ports. - for k, v := range newLocalPorts { - glog.Errorf("Closing local port %s", k.String()) - v.Close() - } + revertPorts(replacementPortsMap, proxier.portsMap) return } // Close old local ports and save new ones. for k, v := range proxier.portsMap { - if newLocalPorts[k] == nil { + if replacementPortsMap[k] == nil { v.Close() } } - proxier.portsMap = newLocalPorts + proxier.portsMap = replacementPortsMap // Clean up the older SNAT rule which was directly in POSTROUTING. // TODO(thockin): Remove this for v1.3 or v1.4. @@ -1198,3 +1197,15 @@ func openLocalPort(lp *localPort) (closeable, error) { glog.V(2).Infof("Opened local port %s", lp.String()) return socket, nil } + +// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only +// closes the ports opened in this sync. +func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) { + for k, v := range replacementPortsMap { + // Only close newly opened local ports - leave ones that were open before this update + if originalPortsMap[k] == nil { + glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String()) + v.Close() + } + } +} diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index f511925ba64..21e59590b2a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -374,4 +374,102 @@ func TestDeleteServiceConnections(t *testing.T) { } } +type fakeClosable struct { + closed bool +} + +func (c *fakeClosable) Close() error { + c.closed = true + return nil +} + +func TestRevertPorts(t *testing.T) { + testCases := []struct { + replacementPorts []localPort + existingPorts []localPort + expectToBeClose []bool + }{ + { + replacementPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + }, + existingPorts: []localPort{}, + expectToBeClose: []bool{true, true, true}, + }, + { + replacementPorts: []localPort{}, + existingPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + }, + expectToBeClose: []bool{}, + }, + { + replacementPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + }, + existingPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + }, + expectToBeClose: []bool{false, false, false}, + }, + { + replacementPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + }, + existingPorts: []localPort{ + {port: 5001}, + {port: 5003}, + }, + expectToBeClose: []bool{false, true, false}, + }, + { + replacementPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + }, + existingPorts: []localPort{ + {port: 5001}, + {port: 5002}, + {port: 5003}, + {port: 5004}, + }, + expectToBeClose: []bool{false, false, false}, + }, + } + + for i, tc := range testCases { + replacementPortsMap := make(map[localPort]closeable) + for _, lp := range tc.replacementPorts { + replacementPortsMap[lp] = &fakeClosable{} + } + existingPortsMap := make(map[localPort]closeable) + for _, lp := range tc.existingPorts { + existingPortsMap[lp] = &fakeClosable{} + } + revertPorts(replacementPortsMap, existingPortsMap) + for j, expectation := range tc.expectToBeClose { + if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation { + t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i) + } + } + for _, lp := range tc.existingPorts { + if existingPortsMap[lp].(*fakeClosable).closed == true { + t.Errorf("Expect existing localport %v to be false in test case %v", lp, i) + } + } + } + +} + // TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.