Compare commits

...

10 Commits

Author SHA1 Message Date
M. Mert Yıldıran
9696ad9bad Show the EntryItem as EntrySummary in EntryDetailed (#506) 2021-11-28 10:59:40 +03:00
M. Mert Yıldıran
a1bda0a6c3 Hide Encoding field if it's undefined or empty in the UI (#511) 2021-11-26 09:40:44 +03:00
M. Mert Yıldıran
a62842ac9f Add HTTP2 Over Cleartext (H2C) support (#510)
* Add HTTP2 Over Cleartext (H2C) support

* Remove a parameter which is a remnant of debugging
2021-11-25 20:36:13 +03:00
M. Mert Yıldıran
e667597e6e Rename URL field to Target URI in the UI to prevent confusion (#509) 2021-11-25 20:15:43 +03:00
Igor Gov
86240e4121 Remove local dev instruction from readme (#507) 2021-11-24 10:46:07 +02:00
David Levanon
b0c8c0c192 Add response body to the error in case of failure (#503)
* add response body to the error in case of failure

* fix typo + make inline condition
2021-11-23 20:16:07 +02:00
Nimrod Gilboa Markevich
1c18eb1b84 Use one channel for events instead of three (#495)
Use one channel for events instead of three separate channels by event type
2021-11-23 15:06:27 +02:00
David Levanon
01d6005a7b minor logging changes (#499)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-23 14:21:53 +02:00
Nimrod Gilboa Markevich
4c97316c02 Remove prevPodPhase (#497)
prevPodPhase does not take into account the fact that there may be more
than one tapper pod. Therefore it is not clear what its value
represents. It is only used in a debug print. It is not worth the effort
to fix for that one debug print.

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-23 10:03:36 +02:00
M. Mert Yıldıran
d66c7445e6 Remove SetHostname method in HTTP extension (#496) 2021-11-22 19:30:06 +03:00
18 changed files with 276 additions and 330 deletions

3
.gitignore vendored
View File

@@ -32,3 +32,6 @@ pprof/*
# Database Files
*.bin
*.gob
# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html
nohup.*

View File

@@ -185,15 +185,3 @@ Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`
indefinitely in the cluster.
For more information please refer to [DAEMON MODE](docs/DAEMON_MODE.md)
## How to Run local UI
- run from mizu/agent `go run main.go --hars-read --hars-dir <folder>`
- copy Har files into the folder from last command
- change `MizuWebsocketURL` and `apiURL` in `api.js` file
- run from mizu/ui - `npm run start`
- open browser on `localhost:3000`

View File

@@ -493,19 +493,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err)
logger.Log.Errorf("error parsing Mizu resource event: %+v", err)
cancel()
}
@@ -514,45 +514,7 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case err, ok := <-errorChan:
if !ok {

View File

@@ -3,11 +3,12 @@ package apiserver
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -61,7 +62,14 @@ func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) {
if response, err := provider.client.Get(healthUrl); err != nil {
return nil, err
} else if response.StatusCode > 299 {
return nil, errors.New(fmt.Sprintf("status code: %d", response.StatusCode))
responseBody := new(strings.Builder)
if _, err := io.Copy(responseBody, response.Body); err != nil {
return nil, fmt.Errorf("status code: %d - (bad response - %v)", response.StatusCode, err)
} else {
singleLineResponse := strings.ReplaceAll(responseBody.String(), "\n", "")
return nil, fmt.Errorf("status code: %d - (response - %v)", response.StatusCode, singleLineResponse)
}
} else {
defer response.Body.Close()

View File

@@ -559,76 +559,73 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
select {
case _, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, added")
case _, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Watching API Server pod loop, added")
case kubernetes.EventDeleted:
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
return
case kubernetes.EventModified:
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
break
continue
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
url := GetApiServerUrl()
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url := GetApiServerUrl()
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
@@ -654,76 +651,53 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
var prevPodPhase core.PodPhase
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
addedPod, err := wEvent.ToPod()
pod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Tapper is created [%s]", pod.Name)
case kubernetes.EventDeleted:
logger.Log.Debugf("Tapper is removed [%s]", pod.Name)
case kubernetes.EventModified:
if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message))
cancel()
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
podStatus := pod.Status
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
continue
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
continue
}
prevPodPhase = podStatus.Phase
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name))
}
}
}
}
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase)))
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
@@ -746,57 +720,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err))
cancel()
}

View File

@@ -70,7 +70,7 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -96,9 +96,9 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
@@ -109,44 +109,28 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
switch wEvent.Type {
case EventAdded:
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventDeleted:
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventModified:
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {

View File

@@ -20,10 +20,8 @@ type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) {
eventChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@@ -42,7 +40,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking
watcher.Stop()
select {
@@ -73,16 +71,14 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
go func() {
<-ctx.Done()
wg.Wait()
close(addedChan)
close(modifiedChan)
close(removedChan)
close(eventChan)
close(errorChan)
}()
return addedChan, modifiedChan, removedChan, errorChan
return eventChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@@ -103,14 +99,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event
continue
}
switch wEvent.Type {
case watch.Added:
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- &wEvent
}
eventChan <- &wEvent
case <-ctx.Done():
return nil
}

View File

@@ -10,6 +10,14 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
const (
EventAdded watch.EventType = watch.Added
EventModified watch.EventType = watch.Modified
EventDeleted watch.EventType = watch.Deleted
EventBookmark watch.EventType = watch.Bookmark
EventError watch.EventType = watch.Error
)
type InvalidObjectType struct {
RequestedType reflect.Type
}

View File

@@ -9,7 +9,7 @@ import (
var Log = logging.MustGetLogger("mizu")
var format = logging.MustStringFormatter(
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
`[%{time:2006-01-02T15:04:05.000-0700}] %{level:-5s} ▶ %{message} ▶ [%{pid} %{shortfile} %{shortfunc}]`,
)
func InitLogger(logPath string) {

View File

@@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/up9inc/mizu/tap/api"
)
@@ -34,12 +35,13 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
streamID,
"HTTP2",
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
@@ -53,12 +55,13 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
}
case http.Response:
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
streamID,
"HTTP2",
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
@@ -84,23 +87,30 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
req, err := http.ReadRequest(b)
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return err
return
}
counterPair.Request++
body, err := ioutil.ReadAll(req.Body)
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
switchingProtocolsHTTP2 = true
}
var body []byte
body, err = ioutil.ReadAll(req.Body)
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
@@ -113,26 +123,34 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
}
filterAndEmit(item, emitter, options)
}
return nil
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
res, err := http.ReadResponse(b, nil)
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
return err
return
}
counterPair.Response++
body, err := ioutil.ReadAll(res.Body)
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
switchingProtocolsHTTP2 = true
}
var body []byte
body, err = ioutil.ReadAll(res.Body)
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
if item != nil {
@@ -145,5 +163,5 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
}
filterAndEmit(item, emitter, options)
}
return nil
return
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log"
"net/http"
"net/url"
"time"
@@ -85,7 +86,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected := false
switchingProtocolsHTTP2 := false
for {
if switchingProtocolsHTTP2 {
switchingProtocolsHTTP2 = false
isHTTP2, err = checkIsHTTP2Connection(b, isClient)
prepareHTTP2Connection(b, isClient)
http2Assembler = createHTTP2Assembler(b)
}
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
@@ -99,15 +108,39 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
continue
}
dissected = true
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
"HTTP2",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
filterAndEmit(item, emitter, options)
}
}
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -124,15 +157,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil {
return address
}
replacedUrl.Host = newHostname
return replacedUrl.String()
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.MizuEntry {
var host, authority, path, service string
@@ -141,6 +165,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
isRequestUpgradedH2C := false
for _, header := range reqDetails["headers"].([]interface{}) {
h := header.(map[string]interface{})
if h["name"] == "Host" {
@@ -152,13 +178,19 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if h["name"] == ":path" {
path = h["value"].(string)
}
if h["name"] == "Upgrade" {
if h["value"].(string) == "h2c" {
isRequestUpgradedH2C = true
}
}
}
if resDetails["bodySize"].(float64) < 0 {
resDetails["bodySize"] = 0
}
if item.Protocol.Version == "2.0" {
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
service = authority
} else {
service = host
@@ -171,6 +203,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
request["url"] = reqDetails["url"].(string)
reqDetails["targetUri"] = reqDetails["url"]
reqDetails["path"] = path
reqDetails["summary"] = path
@@ -189,9 +222,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryString"].([]interface{}))
if resolvedDestination != "" {
service = SetHostname(service, resolvedDestination)
service = resolvedDestination
} else if resolvedSource != "" {
service = SetHostname(service, resolvedSource)
service = resolvedSource
}
method := reqDetails["method"].(string)
@@ -200,7 +233,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
resDetails["statusText"] = grpcStatusCodes[statusCode]
}
if item.Protocol.Version == "2.0" {
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
reqDetails["url"] = path
request["url"] = path
}
@@ -278,9 +311,9 @@ func representRequest(request map[string]interface{}) (repRequest []interface{})
Selector: `request.method`,
},
{
Name: "URL",
Value: request["url"].(string),
Selector: `request.url`,
Name: "Target URI",
Value: request["targetUri"].(string),
Selector: `request.targetUri`,
},
{
Name: "Path",

View File

@@ -92,6 +92,6 @@ func splitIdent(ident string) []string {
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
return key
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/up9inc/mizu/tap/source"
)
const PACKETS_SEEN_LOG_THRESHOLD = 1000
type tcpAssembler struct {
*reassembly.Assembler
streamPool *reassembly.StreamPool
@@ -63,7 +65,11 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
for packetInfo := range packets {
packetsCount := diagnose.AppStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
if packetsCount % PACKETS_SEEN_LOG_THRESHOLD == 0 {
logger.Log.Debugf("Packets seen: #%d", packetsCount)
}
packet := packetInfo.Packet
data := packet.Data()
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
@@ -85,7 +91,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
CaptureInfo: packet.Metadata().CaptureInfo,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
logger.Log.Debugf("%s:%v -> %s:%v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()

View File

@@ -1,9 +1,8 @@
import React from "react";
import EntryViewer from "./EntryDetailed/EntryViewer";
import {EntryItem} from "./EntryListItem/EntryListItem";
import {makeStyles} from "@material-ui/core";
import Protocol from "./UI/Protocol"
import StatusCode from "./UI/StatusCode";
import {Summary} from "./UI/Summary";
const useStyles = makeStyles(() => ({
entryTitle: {
@@ -12,6 +11,7 @@ const useStyles = makeStyles(() => ({
maxHeight: 46,
alignItems: 'center',
marginBottom: 4,
marginLeft: 6,
padding: 2,
paddingBottom: 0
},
@@ -64,18 +64,17 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime, updat
};
const EntrySummary: React.FC<any> = ({data, updateQuery}) => {
const classes = useStyles();
const entry = data.base;
const response = data.response;
return <div className={classes.entrySummary}>
{response && "status" in response && <div style={{marginRight: 8}}>
<StatusCode statusCode={response.status} updateQuery={updateQuery}/>
</div>}
<div style={{flexGrow: 1, overflow: 'hidden'}}>
<Summary method={data.method} summary={data.summary} updateQuery={updateQuery}/>
</div>
</div>;
return <EntryItem
key={entry.id}
entry={entry}
setFocusedEntryId={null}
style={{}}
updateQuery={updateQuery}
forceSelect={false}
headingMode={true}
/>;
};
export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData, updateQuery}) => {

View File

@@ -130,7 +130,7 @@ export const EntryBodySection: React.FC<EntryBodySectionProps> = ({
<table>
<tbody>
<EntryViewLine label={'Mime type'} value={contentType} updateQuery={updateQuery} selector={selector} overrideQueryValue={`r".*"`}/>
<EntryViewLine label={'Encoding'} value={encoding} updateQuery={updateQuery} selector={selector} overrideQueryValue={`r".*"`}/>
{encoding && <EntryViewLine label={'Encoding'} value={encoding} updateQuery={updateQuery} selector={selector} overrideQueryValue={`r".*"`}/>}
</tbody>
</table>

View File

@@ -4,6 +4,7 @@
font-family: "Source Sans Pro", Lucida Grande, Tahoma, sans-serif
height: calc(100% - 70px)
width: 100%
margin-top: 10px
h3,
h4

View File

@@ -40,11 +40,13 @@ interface EntryProps {
setFocusedEntryId: (id: string) => void;
style: object;
updateQuery: any;
forceSelect: boolean;
headingMode: boolean;
}
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style, updateQuery}) => {
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style, updateQuery, forceSelect, headingMode}) => {
const [isSelected, setIsSelected] = useState(false);
const [isSelected, setIsSelected] = useState(!forceSelect ? false : true);
const classification = getClassification(entry.statusCode)
const numberOfRules = entry.rules.numberOfRules
@@ -122,22 +124,23 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style
className={`${styles.row}
${isSelected && !rule && !contractEnabled ? styles.rowSelected : additionalRulesProperties}`}
onClick={() => {
if (!setFocusedEntryId) return;
setIsSelected(!isSelected);
setFocusedEntryId(entry.id.toString());
}}
style={{
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
position: "absolute",
position: !headingMode ? "absolute" : "unset",
top: style['top'],
marginTop: style['marginTop'],
width: "calc(100% - 25px)",
width: !headingMode ? "calc(100% - 25px)" : "calc(100% - 18px)",
}}
>
<Protocol
{!headingMode ? <Protocol
protocol={entry.protocol}
horizontal={false}
updateQuery={updateQuery}
/>
/> : null}
{((entry.protocol.name === "http" && "statusCode" in entry) || entry.statusCode !== 0) && <div>
<StatusCode statusCode={entry.statusCode} updateQuery={updateQuery}/>
</div>}

View File

@@ -119,7 +119,11 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
switch (message.messageType) {
case "entry":
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
var forceSelect = false;
if (!focusedEntryId) {
setFocusedEntryId(entry.id.toString());
forceSelect = true;
}
setEntriesBuffer([
...entriesBuffer,
<EntryItem
@@ -128,6 +132,8 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
setFocusedEntryId={setFocusedEntryId}
style={{}}
updateQuery={updateQuery}
forceSelect={forceSelect}
headingMode={false}
/>
]);
break
@@ -190,16 +196,18 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const entryData = await api.getEntry(focusedEntryId);
setSelectedEntryData(entryData);
} catch (error) {
toast[error.response.data.type](`Entry[${focusedEntryId}]: ${error.response.data.msg}`, {
position: "bottom-right",
theme: "colored",
autoClose: error.response.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
if (error.response) {
toast[error.response.data.type](`Entry[${focusedEntryId}]: ${error.response.data.msg}`, {
position: "bottom-right",
theme: "colored",
autoClose: error.response.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
}
console.error(error);
}
})()