Compare commits

..

5 Commits

Author SHA1 Message Date
M. Mert Yıldıran
a62842ac9f Add HTTP2 Over Cleartext (H2C) support (#510)
* Add HTTP2 Over Cleartext (H2C) support

* Remove a parameter which is a remnant of debugging
2021-11-25 20:36:13 +03:00
M. Mert Yıldıran
e667597e6e Rename URL field to Target URI in the UI to prevent confusion (#509) 2021-11-25 20:15:43 +03:00
Igor Gov
86240e4121 Remove local dev instruction from readme (#507) 2021-11-24 10:46:07 +02:00
David Levanon
b0c8c0c192 Add response body to the error in case of failure (#503)
* add response body to the error in case of failure

* fix typo + make inline condition
2021-11-23 20:16:07 +02:00
Nimrod Gilboa Markevich
1c18eb1b84 Use one channel for events instead of three (#495)
Use one channel for events instead of three separate channels by event type
2021-11-23 15:06:27 +02:00
10 changed files with 220 additions and 279 deletions

View File

@@ -185,15 +185,3 @@ Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`
indefinitely in the cluster.
For more information please refer to [DAEMON MODE](docs/DAEMON_MODE.md)
## How to Run local UI
- run from mizu/agent `go run main.go --hars-read --hars-dir <folder>`
- copy Har files into the folder from last command
- change `MizuWebsocketURL` and `apiURL` in `api.js` file
- run from mizu/ui - `npm run start`
- open browser on `localhost:3000`

View File

@@ -493,19 +493,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err)
logger.Log.Errorf("error parsing Mizu resource event: %+v", err)
cancel()
}
@@ -514,45 +514,7 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case err, ok := <-errorChan:
if !ok {

View File

@@ -3,11 +3,12 @@ package apiserver
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -61,7 +62,14 @@ func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) {
if response, err := provider.client.Get(healthUrl); err != nil {
return nil, err
} else if response.StatusCode > 299 {
return nil, errors.New(fmt.Sprintf("status code: %d", response.StatusCode))
responseBody := new(strings.Builder)
if _, err := io.Copy(responseBody, response.Body); err != nil {
return nil, fmt.Errorf("status code: %d - (bad response - %v)", response.StatusCode, err)
} else {
singleLineResponse := strings.ReplaceAll(responseBody.String(), "\n", "")
return nil, fmt.Errorf("status code: %d - (response - %v)", response.StatusCode, singleLineResponse)
}
} else {
defer response.Body.Close()

View File

@@ -559,76 +559,73 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
select {
case _, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, added")
case _, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Watching API Server pod loop, added")
case kubernetes.EventDeleted:
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
return
case kubernetes.EventModified:
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
break
continue
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
url := GetApiServerUrl()
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url := GetApiServerUrl()
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
@@ -654,70 +651,53 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
addedPod, err := wEvent.ToPod()
pod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Tapper is created [%s]", pod.Name)
case kubernetes.EventDeleted:
logger.Log.Debugf("Tapper is removed [%s]", pod.Name)
case kubernetes.EventModified:
if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message))
cancel()
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
podStatus := pod.Status
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
continue
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name))
}
}
}
}
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase)))
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
@@ -740,57 +720,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err))
cancel()
}

View File

@@ -70,7 +70,7 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -96,9 +96,9 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
@@ -109,44 +109,28 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
switch wEvent.Type {
case EventAdded:
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventDeleted:
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventModified:
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {

View File

@@ -20,10 +20,8 @@ type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) {
eventChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@@ -42,7 +40,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking
watcher.Stop()
select {
@@ -73,16 +71,14 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
go func() {
<-ctx.Done()
wg.Wait()
close(addedChan)
close(modifiedChan)
close(removedChan)
close(eventChan)
close(errorChan)
}()
return addedChan, modifiedChan, removedChan, errorChan
return eventChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@@ -103,14 +99,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event
continue
}
switch wEvent.Type {
case watch.Added:
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- &wEvent
}
eventChan <- &wEvent
case <-ctx.Done():
return nil
}

View File

@@ -10,6 +10,14 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
const (
EventAdded watch.EventType = watch.Added
EventModified watch.EventType = watch.Modified
EventDeleted watch.EventType = watch.Deleted
EventBookmark watch.EventType = watch.Bookmark
EventError watch.EventType = watch.Error
)
type InvalidObjectType struct {
RequestedType reflect.Type
}

View File

@@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/up9inc/mizu/tap/api"
)
@@ -34,12 +35,13 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
streamID,
"HTTP2",
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
@@ -53,12 +55,13 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
}
case http.Response:
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
streamID,
"HTTP2",
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
@@ -84,23 +87,30 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
req, err := http.ReadRequest(b)
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return err
return
}
counterPair.Request++
body, err := ioutil.ReadAll(req.Body)
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
switchingProtocolsHTTP2 = true
}
var body []byte
body, err = ioutil.ReadAll(req.Body)
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
@@ -113,26 +123,34 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
}
filterAndEmit(item, emitter, options)
}
return nil
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
res, err := http.ReadResponse(b, nil)
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
return err
return
}
counterPair.Response++
body, err := ioutil.ReadAll(res.Body)
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
switchingProtocolsHTTP2 = true
}
var body []byte
body, err = ioutil.ReadAll(res.Body)
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%s->%s %s->%s %d %s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
if item != nil {
@@ -145,5 +163,5 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
}
filterAndEmit(item, emitter, options)
}
return nil
return
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log"
"net/http"
"net/url"
"time"
@@ -85,7 +86,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected := false
switchingProtocolsHTTP2 := false
for {
if switchingProtocolsHTTP2 {
switchingProtocolsHTTP2 = false
isHTTP2, err = checkIsHTTP2Connection(b, isClient)
prepareHTTP2Connection(b, isClient)
http2Assembler = createHTTP2Assembler(b)
}
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
@@ -99,15 +108,39 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
continue
}
dissected = true
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
"HTTP2",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
filterAndEmit(item, emitter, options)
}
}
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -132,6 +165,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
isRequestUpgradedH2C := false
for _, header := range reqDetails["headers"].([]interface{}) {
h := header.(map[string]interface{})
if h["name"] == "Host" {
@@ -143,13 +178,19 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if h["name"] == ":path" {
path = h["value"].(string)
}
if h["name"] == "Upgrade" {
if h["value"].(string) == "h2c" {
isRequestUpgradedH2C = true
}
}
}
if resDetails["bodySize"].(float64) < 0 {
resDetails["bodySize"] = 0
}
if item.Protocol.Version == "2.0" {
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
service = authority
} else {
service = host
@@ -162,6 +203,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
request["url"] = reqDetails["url"].(string)
reqDetails["targetUri"] = reqDetails["url"]
reqDetails["path"] = path
reqDetails["summary"] = path
@@ -191,7 +233,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
resDetails["statusText"] = grpcStatusCodes[statusCode]
}
if item.Protocol.Version == "2.0" {
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
reqDetails["url"] = path
request["url"] = path
}
@@ -269,9 +311,9 @@ func representRequest(request map[string]interface{}) (repRequest []interface{})
Selector: `request.method`,
},
{
Name: "URL",
Value: request["url"].(string),
Selector: `request.url`,
Name: "Target URI",
Value: request["targetUri"].(string),
Selector: `request.targetUri`,
},
{
Name: "Path",

View File

@@ -92,6 +92,6 @@ func splitIdent(ident string) []string {
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
return key
}