Fix another freezing issue in Kafka

This commit is contained in:
M. Mert Yildiran
2021-08-23 01:15:18 +03:00
parent c1dee83833
commit 3fa7541c00
3 changed files with 6 additions and 18 deletions

View File

@@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
@@ -40,11 +41,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
if isClient { if isClient {
_, _, err := ReadRequest(b, tcpID) _, _, err := ReadRequest(b, tcpID)
if err != nil { if err != nil {
io.ReadAll(b)
break break
} }
} else { } else {
err := ReadResponse(b, tcpID, emitter) err := ReadResponse(b, tcpID, emitter)
if err != nil { if err != nil {
io.ReadAll(b)
break break
} }
} }

View File

@@ -1,10 +1,8 @@
package main package main
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"log"
"reflect" "reflect"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
@@ -34,10 +32,6 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
correlationID := d.readInt32() correlationID := d.readInt32()
clientID := d.readString() clientID := d.readString()
if apiKey == UpdateMetadata {
return
}
if i := int(apiKey); i < 0 || i >= len(apiTypes) { if i := int(apiKey); i < 0 || i >= len(apiTypes) {
err = fmt.Errorf("unsupported api key: %d", i) err = fmt.Errorf("unsupported api key: %d", i)
return apiKey, apiVersion, err return apiKey, apiVersion, err
@@ -202,9 +196,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
mt.(messageType).decode(d, valueOf(deleteTopicsRequest)) mt.(messageType).decode(d, valueOf(deleteTopicsRequest))
payload = deleteTopicsRequest payload = deleteTopicsRequest
default: default:
msg := fmt.Sprintf("[WARNING] (Request) Not implemented: %s\n", apiKey) return apiKey, 0, fmt.Errorf("(Request) Not implemented: %s", apiKey)
log.Printf(msg)
return apiKey, 0, errors.New(msg)
} }
request := &Request{ request := &Request{

View File

@@ -1,10 +1,8 @@
package main package main
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"log"
"reflect" "reflect"
"time" "time"
@@ -45,10 +43,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
) )
reqResPair := reqResMatcher.registerResponse(key, response) reqResPair := reqResMatcher.registerResponse(key, response)
if reqResPair == nil { if reqResPair == nil {
d.discardAll() return fmt.Errorf("Couldn't match a Kafka response to a Kafka request in 3 seconds!")
msg := "Couldn't match a Kafka response to a Kafka request in 3 seconds!"
log.Printf("[WARNING] %s\n", msg)
return errors.New(msg)
} }
apiKey := reqResPair.Request.ApiKey apiKey := reqResPair.Request.ApiKey
apiVersion := reqResPair.Request.ApiVersion apiVersion := reqResPair.Request.ApiVersion
@@ -242,9 +237,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
mt.(messageType).decode(d, valueOf(deleteTopicsResponse)) mt.(messageType).decode(d, valueOf(deleteTopicsResponse))
reqResPair.Response.Payload = deleteTopicsResponse reqResPair.Response.Payload = deleteTopicsResponse
default: default:
msg := fmt.Sprintf("[WARNING] (Response) Not implemented: %s\n", apiKey) return fmt.Errorf("(Response) Not implemented: %s", apiKey)
log.Printf(msg)
return errors.New(msg)
} }
connectionInfo := &api.ConnectionInfo{ connectionInfo := &api.ConnectionInfo{