Merge branch 'develop'

Conflicts:
	cli/cmd/tapRunner.go
This commit is contained in:
RamiBerm
2021-07-13 16:23:27 +03:00
19 changed files with 137 additions and 44 deletions

View File

@@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"github.com/romana/rlog"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared"
@@ -252,11 +253,12 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
if tappingOptions.Analyze {
url_path := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s", mizuProxiedUrl, tappingOptions.AnalyzeDestination)
url_path := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s", mizuProxiedUrl, url.QueryEscape(tappingOptions.AnalyzeDestination))
u, err := url.ParseRequestURI(url_path)
if err != nil {
log.Fatal(fmt.Sprintf("Failed parsing the URL %v\n", err))
}
rlog.Debugf("Sending get request to %v\n", u.String())
if response, err := http.Get(u.String()); err != nil && response.StatusCode != 200 {
fmt.Printf("error sending upload entries req %v\n", err)
} else {
@@ -324,7 +326,8 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOptions *MizuTapOptions) {
controlSocket, err := mizu.CreateControlSocket(fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuCollectorProxiedHostAndPath(tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)))
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuCollectorProxiedHostAndPath(tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName))
controlSocket, err := mizu.CreateControlSocket(controlSocketStr)
if err != nil {
fmt.Printf("error establishing control socket connection %s\n", err)
cancel()
@@ -337,7 +340,7 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
default:
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
if err != nil {
fmt.Printf("error Sending message via control socket %s\n", err)
rlog.Debugf("error Sending message via control socket %v, error: %s\n", controlSocketStr, err)
}
time.Sleep(10 * time.Second)
}

View File

@@ -15,6 +15,7 @@ import (
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
resource "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
@@ -76,6 +77,24 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
if err != nil {
return nil, err
}
cpuLimit, err := resource.ParseQuantity("750")
if err != nil {
return nil, errors.New("invalid cpu limit for aggregator container")
}
memLimit, err := resource.ParseQuantity("512Mi")
if err != nil {
return nil, errors.New("invalid memory limit for aggregator container")
}
cpuRequests, err := resource.ParseQuantity("50m")
if err != nil {
return nil, errors.New("invalid cpu request for aggregator container")
}
memRequests, err := resource.ParseQuantity("50Mi")
if err != nil {
return nil, errors.New("invalid memory request for aggregator container")
}
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
@@ -103,6 +122,16 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
Value: strconv.FormatInt(maxEntriesDBSizeBytes, 10),
},
},
Resources: core.ResourceRequirements{
Limits: core.ResourceList{
"cpu": cpuLimit,
"memory": memLimit,
},
Requests: core.ResourceList{
"cpu": cpuRequests,
"memory": memRequests,
},
},
},
},
DNSPolicy: core.DNSClusterFirstWithHostNet,
@@ -341,6 +370,32 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
),
),
)
cpuLimit, err := resource.ParseQuantity("500m")
if err != nil {
return errors.New("invalid cpu limit for tapper container")
}
memLimit, err := resource.ParseQuantity("1Gi")
if err != nil {
return errors.New("invalid memory limit for tapper container")
}
cpuRequests, err := resource.ParseQuantity("50m")
if err != nil {
return errors.New("invalid cpu request for tapper container")
}
memRequests, err := resource.ParseQuantity("50Mi")
if err != nil {
return errors.New("invalid memory request for tapper container")
}
agentResourceLimits := core.ResourceList{
"cpu": cpuLimit,
"memory": memLimit,
}
agentResourceRequests := core.ResourceList{
"cpu": cpuRequests,
"memory": memRequests,
}
agentResources := applyconfcore.ResourceRequirements().WithRequests(agentResourceRequests).WithLimits(agentResourceLimits)
agentContainer.WithResources(agentResources)
nodeNames := make([]string, 0, len(nodeToTappedPodIPMap))
for nodeName := range nodeToTappedPodIPMap {