mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Part 1 of plural ports: Add protocol to Endpoints
This makes it easier to make the second step, which is moving endpoints to a struct instead of a string.
This commit is contained in:
parent
6bbde837b8
commit
34eaa0dbd6
@ -747,6 +747,9 @@ type Endpoints struct {
|
||||
TypeMeta `json:",inline"`
|
||||
ObjectMeta `json:"metadata,omitempty"`
|
||||
|
||||
// Optional: The IP protocol for these endpoints. Supports "TCP" and
|
||||
// "UDP". Defaults to "TCP".
|
||||
Protocol Protocol `json:"protocol,omitempty"`
|
||||
Endpoints []string `json:"endpoints,omitempty"`
|
||||
}
|
||||
|
||||
|
@ -85,5 +85,10 @@ func init() {
|
||||
obj.Type = SecretTypeOpaque
|
||||
}
|
||||
},
|
||||
func(obj *Endpoints) {
|
||||
if obj.Protocol == "" {
|
||||
obj.Protocol = "TCP"
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) {
|
||||
t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDefaulEndpointsProtocol(t *testing.T) {
|
||||
in := ¤t.Endpoints{}
|
||||
obj := roundTrip(t, runtime.Object(in))
|
||||
out := obj.(*current.Endpoints)
|
||||
|
||||
if out.Protocol != current.ProtocolTCP {
|
||||
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
|
||||
}
|
||||
}
|
||||
|
@ -601,7 +601,10 @@ type Service struct {
|
||||
// Endpoints is a collection of endpoints that implement the actual service, for example:
|
||||
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
|
||||
type Endpoints struct {
|
||||
TypeMeta `json:",inline"`
|
||||
TypeMeta `json:",inline"`
|
||||
// Optional: The IP protocol for these endpoints. Supports "TCP" and
|
||||
// "UDP". Defaults to "TCP".
|
||||
Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"`
|
||||
Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"`
|
||||
}
|
||||
|
||||
|
@ -87,5 +87,10 @@ func init() {
|
||||
obj.Type = SecretTypeOpaque
|
||||
}
|
||||
},
|
||||
func(obj *Endpoints) {
|
||||
if obj.Protocol == "" {
|
||||
obj.Protocol = "TCP"
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) {
|
||||
t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDefaulEndpointsProtocol(t *testing.T) {
|
||||
in := ¤t.Endpoints{}
|
||||
obj := roundTrip(t, runtime.Object(in))
|
||||
out := obj.(*current.Endpoints)
|
||||
|
||||
if out.Protocol != current.ProtocolTCP {
|
||||
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
|
||||
}
|
||||
}
|
||||
|
@ -565,7 +565,10 @@ type Service struct {
|
||||
// Endpoints is a collection of endpoints that implement the actual service, for example:
|
||||
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
|
||||
type Endpoints struct {
|
||||
TypeMeta `json:",inline"`
|
||||
TypeMeta `json:",inline"`
|
||||
// Optional: The IP protocol for these endpoints. Supports "TCP" and
|
||||
// "UDP". Defaults to "TCP".
|
||||
Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"`
|
||||
Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"`
|
||||
}
|
||||
|
||||
|
@ -80,5 +80,10 @@ func init() {
|
||||
obj.Type = SecretTypeOpaque
|
||||
}
|
||||
},
|
||||
func(obj *Endpoints) {
|
||||
if obj.Protocol == "" {
|
||||
obj.Protocol = "TCP"
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) {
|
||||
t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDefaulEndpointsProtocol(t *testing.T) {
|
||||
in := ¤t.Endpoints{}
|
||||
obj := roundTrip(t, runtime.Object(in))
|
||||
out := obj.(*current.Endpoints)
|
||||
|
||||
if out.Protocol != current.ProtocolTCP {
|
||||
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
|
||||
}
|
||||
}
|
||||
|
@ -779,6 +779,10 @@ type Endpoints struct {
|
||||
TypeMeta `json:",inline"`
|
||||
ObjectMeta `json:"metadata"`
|
||||
|
||||
// Optional: The IP protocol for these endpoints. Supports "TCP" and
|
||||
// "UDP". Defaults to "TCP".
|
||||
Protocol Protocol `json:"protocol,omitempty"`
|
||||
|
||||
// Endpoints is the list of host ports that satisfy the service selector
|
||||
Endpoints []string `json:"endpoints"`
|
||||
}
|
||||
|
@ -135,6 +135,7 @@ func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) err
|
||||
Name: serviceName,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Protocol: "TCP",
|
||||
}
|
||||
}
|
||||
found := false
|
||||
|
@ -598,10 +598,10 @@ func TestEtcdListEndpoints(t *testing.T) {
|
||||
Node: &etcd.Node{
|
||||
Nodes: []*etcd.Node{
|
||||
{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}),
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []string{"127.0.0.1:8345"}}),
|
||||
},
|
||||
{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}),
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -625,6 +625,7 @@ func TestEtcdGetEndpoints(t *testing.T) {
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
endpoints := &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"127.0.0.1:34855"},
|
||||
}
|
||||
|
||||
@ -648,6 +649,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
endpoints := api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"baz", "bar"},
|
||||
}
|
||||
|
||||
|
@ -98,6 +98,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: service.Name,
|
||||
},
|
||||
Protocol: service.Spec.Protocol,
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("Error getting endpoints: %v", err)
|
||||
@ -113,8 +114,8 @@ func (e *EndpointController) SyncServiceEndpoints() error {
|
||||
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
|
||||
} else {
|
||||
// Pre-existing
|
||||
if endpointsEqual(currentEndpoints, endpoints) {
|
||||
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
||||
if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) {
|
||||
glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
||||
continue
|
||||
}
|
||||
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
|
||||
|
@ -245,6 +245,71 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"6.7.8.9:1000"},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
endpointsHandler.ValidateRequestCount(t, 0)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Protocol: "TCP",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t,
|
||||
serverResponse{http.StatusOK, newPodList(0)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"6.7.8.9:1000"},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
endpointsHandler.ValidateRequestCount(t, 0)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Protocol: "UDP",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t,
|
||||
serverResponse{http.StatusOK, newPodList(0)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "UDP",
|
||||
Endpoints: []string{"6.7.8.9:1000"},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
@ -275,6 +340,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
@ -288,6 +354,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"1.2.3.4:8080"},
|
||||
})
|
||||
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data)
|
||||
@ -314,6 +381,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"6.7.8.9:1000"},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
@ -327,6 +395,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
||||
Name: "foo",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"1.2.3.4:8080"},
|
||||
})
|
||||
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data)
|
||||
@ -352,6 +421,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"1.2.3.4:8080"},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
@ -390,6 +460,7 @@ func TestSyncEndpointsItems(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
ResourceVersion: "",
|
||||
},
|
||||
Protocol: "TCP",
|
||||
Endpoints: []string{"1.2.3.4:8080"},
|
||||
})
|
||||
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data)
|
||||
|
Loading…
Reference in New Issue
Block a user