mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-31 10:11:59 +00:00
Add Redis Serialization Protocol support (#290)
* Bring in the files * Add request-response pair matcher for Redis * Implement the `Represent` method * Update `representGeneric` method signature * Don't export `IntToByteArr` * Remove unused `newRedisInputStream` method * Return the errors as string * Adapt to the latest change in the `develop`
This commit is contained in:
14
tap/extensions/redis/errors.go
Normal file
14
tap/extensions/redis/errors.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package main
|
||||
|
||||
//ConnectError redis connection error,such as io timeout
|
||||
type ConnectError struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
func newConnectError(message string) *ConnectError {
|
||||
return &ConnectError{Message: message}
|
||||
}
|
||||
|
||||
func (e *ConnectError) Error() string {
|
||||
return e.Message
|
||||
}
|
9
tap/extensions/redis/go.mod
Normal file
9
tap/extensions/redis/go.mod
Normal file
@@ -0,0 +1,9 @@
|
||||
module github.com/up9inc/mizu/tap/extensions/redis
|
||||
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
|
55
tap/extensions/redis/handlers.go
Normal file
55
tap/extensions/redis/handlers.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
|
||||
counterPair.Request++
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
)
|
||||
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.SrcIP,
|
||||
ClientPort: tcpID.SrcPort,
|
||||
ServerIP: tcpID.DstIP,
|
||||
ServerPort: tcpID.DstPort,
|
||||
IsOutgoing: true,
|
||||
}
|
||||
emitter.Emit(item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
|
||||
counterPair.Response++
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
)
|
||||
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.DstIP,
|
||||
ClientPort: tcpID.DstPort,
|
||||
ServerIP: tcpID.SrcIP,
|
||||
ServerPort: tcpID.SrcPort,
|
||||
IsOutgoing: false,
|
||||
}
|
||||
emitter.Emit(item)
|
||||
}
|
||||
return nil
|
||||
}
|
57
tap/extensions/redis/helpers.go
Normal file
57
tap/extensions/redis/helpers.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type RedisPayload struct {
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
type RedisPayloader interface {
|
||||
MarshalJSON() ([]byte, error)
|
||||
}
|
||||
|
||||
func (h RedisPayload) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(h.Data)
|
||||
}
|
||||
|
||||
type RedisWrapper struct {
|
||||
Method string `json:"method"`
|
||||
Url string `json:"url"`
|
||||
Details interface{} `json:"details"`
|
||||
}
|
||||
|
||||
func representGeneric(generic map[string]string) (representation []interface{}) {
|
||||
details, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Type",
|
||||
"value": generic["type"],
|
||||
},
|
||||
{
|
||||
"name": "Command",
|
||||
"value": generic["command"],
|
||||
},
|
||||
{
|
||||
"name": "Key",
|
||||
"value": generic["key"],
|
||||
},
|
||||
{
|
||||
"name": "Value",
|
||||
"value": generic["value"],
|
||||
},
|
||||
{
|
||||
"name": "Keyword",
|
||||
"value": generic["keyword"],
|
||||
},
|
||||
})
|
||||
representation = append(representation, map[string]string{
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
|
||||
return
|
||||
}
|
154
tap/extensions/redis/main.go
Normal file
154
tap/extensions/redis/main.go
Normal file
@@ -0,0 +1,154 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var protocol api.Protocol = api.Protocol{
|
||||
Name: "redis",
|
||||
LongName: "Redis Serialization Protocol",
|
||||
Abbreviation: "REDIS",
|
||||
Version: "3.x",
|
||||
BackgroundColor: "#a41e11",
|
||||
ForegroundColor: "#ffffff",
|
||||
FontSize: 11,
|
||||
ReferenceLink: "https://redis.io/topics/protocol",
|
||||
Ports: []string{"6379"},
|
||||
Priority: 3,
|
||||
}
|
||||
|
||||
func init() {
|
||||
log.Println("Initializing Redis extension...")
|
||||
}
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
is := &RedisInputStream{
|
||||
Reader: b,
|
||||
Buf: make([]byte, 8192),
|
||||
}
|
||||
proto := NewProtocol(is)
|
||||
for {
|
||||
redisPacket, err := proto.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isClient {
|
||||
handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
|
||||
} else {
|
||||
handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
service := "redis"
|
||||
if resolvedDestination != "" {
|
||||
service = resolvedDestination
|
||||
} else if resolvedSource != "" {
|
||||
service = resolvedSource
|
||||
}
|
||||
|
||||
method := ""
|
||||
if reqDetails["command"] != nil {
|
||||
method = reqDetails["command"].(string)
|
||||
}
|
||||
|
||||
summary := ""
|
||||
if reqDetails["key"] != nil {
|
||||
summary = reqDetails["key"].(string)
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: protocol.Name,
|
||||
ProtocolLongName: protocol.LongName,
|
||||
ProtocolAbbreviation: protocol.Abbreviation,
|
||||
ProtocolVersion: protocol.Version,
|
||||
ProtocolBackgroundColor: protocol.BackgroundColor,
|
||||
ProtocolForegroundColor: protocol.ForegroundColor,
|
||||
ProtocolFontSize: protocol.FontSize,
|
||||
ProtocolReferenceLink: protocol.ReferenceLink,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, summary),
|
||||
Method: method,
|
||||
Status: 0,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: 0,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
return &api.BaseEntryDetails{
|
||||
Id: entry.EntryId,
|
||||
Protocol: protocol,
|
||||
Url: entry.Url,
|
||||
RequestSenderIp: entry.RequestSenderIp,
|
||||
Service: entry.Service,
|
||||
Summary: entry.Path,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
Timestamp: entry.Timestamp,
|
||||
SourceIp: entry.SourceIp,
|
||||
DestinationIp: entry.DestinationIp,
|
||||
SourcePort: entry.SourcePort,
|
||||
DestinationPort: entry.DestinationPort,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: api.ApplicableRules{
|
||||
Latency: 0,
|
||||
Status: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
|
||||
p = protocol
|
||||
bodySize = 0
|
||||
var root map[string]interface{}
|
||||
json.Unmarshal([]byte(entry.Entry), &root)
|
||||
representation := make(map[string]interface{}, 0)
|
||||
request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]string)
|
||||
resDetails := response["details"].(map[string]string)
|
||||
repRequest := representGeneric(reqDetails)
|
||||
repResponse := representGeneric(resDetails)
|
||||
representation["request"] = repRequest
|
||||
representation["response"] = repResponse
|
||||
object, err = json.Marshal(representation)
|
||||
return
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
102
tap/extensions/redis/matcher.go
Normal file
102
tap/extensions/redis/matcher.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var reqResMatcher = createResponseRequestMatcher() // global
|
||||
|
||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
|
||||
func createResponseRequestMatcher() requestResponseMatcher {
|
||||
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
return *newMatcher
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
requestRedisMessage := api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
Payload: RedisPayload{
|
||||
Data: &RedisWrapper{
|
||||
Method: string(request.Command),
|
||||
Url: "",
|
||||
Details: request,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
responseRedisMessage := response.(*api.GenericMessage)
|
||||
if responseRedisMessage.IsRequest {
|
||||
return nil
|
||||
}
|
||||
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &requestRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
responseRedisMessage := api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
Payload: RedisPayload{
|
||||
Data: &RedisWrapper{
|
||||
Method: string(response.Command),
|
||||
Url: "",
|
||||
Details: response,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
requestRedisMessage := request.(*api.GenericMessage)
|
||||
if !requestRedisMessage.IsRequest {
|
||||
return nil
|
||||
}
|
||||
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &responseRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.GenericMessage, responseRedisMessage *api.GenericMessage) *api.OutputChannelItem {
|
||||
return &api.OutputChannelItem{
|
||||
Protocol: protocol,
|
||||
Timestamp: requestRedisMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
|
||||
ConnectionInfo: nil,
|
||||
Pair: &api.RequestResponsePair{
|
||||
Request: *requestRedisMessage,
|
||||
Response: *responseRedisMessage,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func splitIdent(ident string) []string {
|
||||
ident = strings.Replace(ident, "->", " ", -1)
|
||||
return strings.Split(ident, " ")
|
||||
}
|
||||
|
||||
func genKey(split []string) string {
|
||||
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
|
||||
return key
|
||||
}
|
474
tap/extensions/redis/read.go
Normal file
474
tap/extensions/redis/read.go
Normal file
@@ -0,0 +1,474 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
askPrefix = "ASK "
|
||||
movedPrefix = "MOVED "
|
||||
clusterDownPrefix = "CLUSTERDOWN "
|
||||
busyPrefix = "BUSY "
|
||||
noscriptPrefix = "NOSCRIPT "
|
||||
|
||||
defaultHost = "localhost"
|
||||
defaultPort = 6379
|
||||
defaultSentinelPort = 26379
|
||||
defaultTimeout = 5 * time.Second
|
||||
defaultDatabase = 2 * time.Second
|
||||
|
||||
dollarByte = '$'
|
||||
asteriskByte = '*'
|
||||
plusByte = '+'
|
||||
minusByte = '-'
|
||||
colonByte = ':'
|
||||
notApplicableByte = '0'
|
||||
|
||||
sentinelMasters = "masters"
|
||||
sentinelGetMasterAddrByName = "get-master-addr-by-name"
|
||||
sentinelReset = "reset"
|
||||
sentinelSlaves = "slaves"
|
||||
sentinelFailOver = "failover"
|
||||
sentinelMonitor = "monitor"
|
||||
sentinelRemove = "remove"
|
||||
sentinelSet = "set"
|
||||
|
||||
clusterNodes = "nodes"
|
||||
clusterMeet = "meet"
|
||||
clusterReset = "reset"
|
||||
clusterAddSlots = "addslots"
|
||||
clusterDelSlots = "delslots"
|
||||
clusterInfo = "info"
|
||||
clusterGetKeysInSlot = "getkeysinslot"
|
||||
clusterSetSlot = "setslot"
|
||||
clusterSetSlotNode = "node"
|
||||
clusterSetSlotMigrating = "migrating"
|
||||
clusterSetSlotImporting = "importing"
|
||||
clusterSetSlotStable = "stable"
|
||||
clusterForget = "forget"
|
||||
clusterFlushSlot = "flushslots"
|
||||
clusterKeySlot = "keyslot"
|
||||
clusterCountKeyInSlot = "countkeysinslot"
|
||||
clusterSaveConfig = "saveconfig"
|
||||
clusterReplicate = "replicate"
|
||||
clusterSlaves = "slaves"
|
||||
clusterFailOver = "failover"
|
||||
clusterSlots = "slots"
|
||||
pubSubChannels = "channels"
|
||||
pubSubNumSub = "numsub"
|
||||
pubSubNumPat = "numpat"
|
||||
)
|
||||
|
||||
//intToByteArr convert int to byte array
|
||||
func intToByteArr(a int) []byte {
|
||||
buf := make([]byte, 0)
|
||||
return strconv.AppendInt(buf, int64(a), 10)
|
||||
}
|
||||
|
||||
var (
|
||||
bytesTrue = intToByteArr(1)
|
||||
bytesFalse = intToByteArr(0)
|
||||
bytesTilde = []byte("~")
|
||||
|
||||
positiveInfinityBytes = []byte("+inf")
|
||||
negativeInfinityBytes = []byte("-inf")
|
||||
)
|
||||
|
||||
var (
|
||||
sizeTable = []int{9, 99, 999, 9999, 99999, 999999, 9999999, 99999999,
|
||||
999999999, math.MaxInt32}
|
||||
|
||||
digitTens = []byte{'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1',
|
||||
'1', '1', '1', '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2',
|
||||
'2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4',
|
||||
'4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6',
|
||||
'6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8',
|
||||
'8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9'}
|
||||
|
||||
digitOnes = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
|
||||
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8',
|
||||
'9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6',
|
||||
'7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4',
|
||||
'5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
|
||||
'3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
|
||||
|
||||
digits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a',
|
||||
'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
|
||||
't', 'u', 'v', 'w', 'x', 'y', 'z'}
|
||||
)
|
||||
|
||||
// receive message from redis
|
||||
type RedisInputStream struct {
|
||||
*bufio.Reader
|
||||
Buf []byte
|
||||
count int
|
||||
limit int
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readByte() (byte, error) {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
ret := r.Buf[r.count]
|
||||
r.count++
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) ensureFill() error {
|
||||
if r.count < r.limit {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
r.limit, err = r.Read(r.Buf)
|
||||
if err != nil {
|
||||
return newConnectError(err.Error())
|
||||
}
|
||||
r.count = 0
|
||||
if r.limit == -1 {
|
||||
return newConnectError("Unexpected end of stream")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readLine() (string, error) {
|
||||
buf := ""
|
||||
for {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
b := r.Buf[r.count]
|
||||
r.count++
|
||||
if b == '\r' {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
c := r.Buf[r.count]
|
||||
r.count++
|
||||
if c == '\n' {
|
||||
break
|
||||
}
|
||||
buf += string(b)
|
||||
buf += string(c)
|
||||
} else {
|
||||
buf += string(b)
|
||||
}
|
||||
}
|
||||
if buf == "" {
|
||||
return "", newConnectError("It seems like server has closed the connection.")
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readLineBytes() ([]byte, error) {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pos := r.count
|
||||
buf := r.Buf
|
||||
for {
|
||||
if pos == r.limit {
|
||||
return r.readLineBytesSlowly()
|
||||
}
|
||||
p := buf[pos]
|
||||
pos++
|
||||
if p == '\r' {
|
||||
if pos == r.limit {
|
||||
return r.readLineBytesSlowly()
|
||||
}
|
||||
p := buf[pos]
|
||||
pos++
|
||||
if p == '\n' {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
N := pos - r.count - 2
|
||||
line := make([]byte, N)
|
||||
j := 0
|
||||
for i := r.count; i <= N; i++ {
|
||||
line[j] = buf[i]
|
||||
j++
|
||||
}
|
||||
r.count = pos
|
||||
return line, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readLineBytesSlowly() ([]byte, error) {
|
||||
buf := make([]byte, 0)
|
||||
for {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := r.Buf[r.count]
|
||||
r.count++
|
||||
if b == 'r' {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := r.Buf[r.count]
|
||||
r.count++
|
||||
if c == '\n' {
|
||||
break
|
||||
}
|
||||
buf = append(buf, b)
|
||||
buf = append(buf, c)
|
||||
} else {
|
||||
buf = append(buf, b)
|
||||
}
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readIntCrLf() (int64, error) {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
buf := r.Buf
|
||||
isNeg := false
|
||||
if buf[r.count] == '-' {
|
||||
isNeg = true
|
||||
}
|
||||
if isNeg {
|
||||
r.count++
|
||||
}
|
||||
value := int64(0)
|
||||
for {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
b := buf[r.count]
|
||||
r.count++
|
||||
if b == '\r' {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
c := buf[r.count]
|
||||
r.count++
|
||||
if c != '\n' {
|
||||
return 0, newConnectError("Unexpected character!")
|
||||
}
|
||||
break
|
||||
} else {
|
||||
value = value*10 + int64(b) - int64('0')
|
||||
}
|
||||
}
|
||||
if isNeg {
|
||||
return -value, nil
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
type RedisProtocol struct {
|
||||
is *RedisInputStream
|
||||
}
|
||||
|
||||
func NewProtocol(is *RedisInputStream) *RedisProtocol {
|
||||
return &RedisProtocol{
|
||||
is: is,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
|
||||
x, r, err := p.process()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
packet = &RedisPacket{}
|
||||
packet.Type = r
|
||||
|
||||
switch x.(type) {
|
||||
case []interface{}:
|
||||
array := x.([]interface{})
|
||||
packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8))))
|
||||
if len(array) > 1 {
|
||||
packet.Key = string(array[1].([]uint8))
|
||||
}
|
||||
if len(array) > 2 {
|
||||
packet.Value = string(array[2].([]uint8))
|
||||
}
|
||||
case []uint8:
|
||||
val := string(x.([]uint8))
|
||||
if packet.Type == types[plusByte] {
|
||||
packet.Keyword = RedisKeyword(strings.ToUpper(val))
|
||||
if !isValidRedisKeyword(keywords, packet.Keyword) {
|
||||
err = errors.New(fmt.Sprintf("Unrecognized keyword: %s", string(packet.Command)))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
packet.Value = val
|
||||
}
|
||||
case string:
|
||||
packet.Value = x.(string)
|
||||
default:
|
||||
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
|
||||
log.Printf(msg)
|
||||
err = errors.New(msg)
|
||||
return
|
||||
}
|
||||
|
||||
if packet.Command != "" {
|
||||
if !isValidRedisCommand(commands, packet.Command) {
|
||||
err = errors.New(fmt.Sprintf("Unrecognized command: %s", string(packet.Command)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) process() (v interface{}, r RedisType, err error) {
|
||||
b, err := p.is.readByte()
|
||||
if err != nil {
|
||||
return nil, types[notApplicableByte], newConnectError(err.Error())
|
||||
}
|
||||
switch b {
|
||||
case plusByte:
|
||||
v, err = p.processSimpleString()
|
||||
r = types[plusByte]
|
||||
return
|
||||
case dollarByte:
|
||||
v, err = p.processBulkString()
|
||||
r = types[dollarByte]
|
||||
return
|
||||
case asteriskByte:
|
||||
v, err = p.processArray()
|
||||
r = types[asteriskByte]
|
||||
return
|
||||
case colonByte:
|
||||
v, err = p.processInteger()
|
||||
r = types[colonByte]
|
||||
return
|
||||
case minusByte:
|
||||
v, err = p.processError()
|
||||
r = types[minusByte]
|
||||
return
|
||||
default:
|
||||
return nil, types[notApplicableByte], newConnectError(fmt.Sprintf("Unknown reply: %b", b))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processSimpleString() ([]byte, error) {
|
||||
return p.is.readLineBytes()
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processBulkString() ([]byte, error) {
|
||||
l, err := p.is.readIntCrLf()
|
||||
if err != nil {
|
||||
return nil, newConnectError(err.Error())
|
||||
}
|
||||
if l == -1 {
|
||||
return nil, nil
|
||||
}
|
||||
line := make([]byte, 0)
|
||||
for {
|
||||
err := p.is.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := p.is.Buf[p.is.count]
|
||||
p.is.count++
|
||||
if b == '\r' {
|
||||
err := p.is.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := p.is.Buf[p.is.count]
|
||||
p.is.count++
|
||||
if c != '\n' {
|
||||
return nil, newConnectError("Unexpected character!")
|
||||
}
|
||||
break
|
||||
} else {
|
||||
line = append(line, b)
|
||||
}
|
||||
}
|
||||
return line, nil
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processArray() ([]interface{}, error) {
|
||||
l, err := p.is.readIntCrLf()
|
||||
if err != nil {
|
||||
return nil, newConnectError(err.Error())
|
||||
}
|
||||
if l == -1 {
|
||||
return nil, nil
|
||||
}
|
||||
ret := make([]interface{}, 0)
|
||||
for i := 0; i < int(l); i++ {
|
||||
if obj, _, err := p.process(); err != nil {
|
||||
ret = append(ret, err)
|
||||
} else {
|
||||
ret = append(ret, obj)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processInteger() (int64, error) {
|
||||
return p.is.readIntCrLf()
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processError() (interface{}, error) {
|
||||
msg, err := p.is.readLine()
|
||||
if err != nil {
|
||||
return nil, newConnectError(err.Error())
|
||||
}
|
||||
if strings.HasPrefix(msg, movedPrefix) {
|
||||
host, port, slot, err := p.parseTargetHostAndSlot(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fmt.Sprintf("MovedDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil
|
||||
} else if strings.HasPrefix(msg, askPrefix) {
|
||||
host, port, slot, err := p.parseTargetHostAndSlot(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fmt.Sprintf("AskDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil
|
||||
} else if strings.HasPrefix(msg, clusterDownPrefix) {
|
||||
return fmt.Sprintf("ClusterError: %s", msg), nil
|
||||
} else if strings.HasPrefix(msg, busyPrefix) {
|
||||
return fmt.Sprintf("BusyError: %s", msg), nil
|
||||
} else if strings.HasPrefix(msg, noscriptPrefix) {
|
||||
return fmt.Sprintf("NoScriptError: %s", msg), nil
|
||||
}
|
||||
return fmt.Sprintf("DataError: %s", msg), nil
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) parseTargetHostAndSlot(clusterRedirectResponse string) (host string, po int, slot int, err error) {
|
||||
arr := strings.Split(clusterRedirectResponse, " ")
|
||||
host, port := p.extractParts(arr[2])
|
||||
slot, err = strconv.Atoi(arr[1])
|
||||
po, err = strconv.Atoi(port)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) extractParts(from string) (string, string) {
|
||||
idx := strings.LastIndex(from, ":")
|
||||
host := from
|
||||
if idx != -1 {
|
||||
host = from[0:idx]
|
||||
}
|
||||
port := ""
|
||||
if idx != -1 {
|
||||
port = from[idx+1:]
|
||||
}
|
||||
return host, port
|
||||
}
|
290
tap/extensions/redis/structs.go
Normal file
290
tap/extensions/redis/structs.go
Normal file
@@ -0,0 +1,290 @@
|
||||
package main
|
||||
|
||||
type RedisType string
|
||||
type RedisCommand string
|
||||
type RedisKeyword string
|
||||
|
||||
var types map[rune]RedisType = map[rune]RedisType{
|
||||
plusByte: "Simple String",
|
||||
dollarByte: "Bulk String",
|
||||
asteriskByte: "Array",
|
||||
colonByte: "Integer",
|
||||
minusByte: "Error",
|
||||
notApplicableByte: "N/A",
|
||||
}
|
||||
|
||||
var commands []RedisCommand = []RedisCommand{
|
||||
"PING",
|
||||
"SET",
|
||||
"GET",
|
||||
"QUIT",
|
||||
"EXISTS",
|
||||
"DEL",
|
||||
"UNLINK",
|
||||
"TYPE",
|
||||
"FLUSHDB",
|
||||
"KEYS",
|
||||
"RANDOMKEY",
|
||||
"RENAME",
|
||||
"RENAMENX",
|
||||
"RENAMEX",
|
||||
"DBSIZE",
|
||||
"EXPIRE",
|
||||
"EXPIREAT",
|
||||
"TTL",
|
||||
"SELECT",
|
||||
"MOVE",
|
||||
"FLUSHALL",
|
||||
"GETSET",
|
||||
"MGET",
|
||||
"SETNX",
|
||||
"SETEX",
|
||||
"MSET",
|
||||
"MSETNX",
|
||||
"DECRBY",
|
||||
"DECR",
|
||||
"INCRBY",
|
||||
"INCR",
|
||||
"APPEND",
|
||||
"SUBSTR",
|
||||
"HSET",
|
||||
"HGET",
|
||||
"HSETNX",
|
||||
"HMSET",
|
||||
"HMGET",
|
||||
"HINCRBY",
|
||||
"HEXISTS",
|
||||
"HDEL",
|
||||
"HLEN",
|
||||
"HKEYS",
|
||||
"HVALS",
|
||||
"HGETALL",
|
||||
"RPUSH",
|
||||
"LPUSH",
|
||||
"LLEN",
|
||||
"LRANGE",
|
||||
"LTRIM",
|
||||
"LINDEX",
|
||||
"LSET",
|
||||
"LREM",
|
||||
"LPOP",
|
||||
"RPOP",
|
||||
"RPOPLPUSH",
|
||||
"SADD",
|
||||
"SMEMBERS",
|
||||
"SREM",
|
||||
"SPOP",
|
||||
"SMOVE",
|
||||
"SCARD",
|
||||
"SISMEMBER",
|
||||
"SINTER",
|
||||
"SINTERSTORE",
|
||||
"SUNION",
|
||||
"SUNIONSTORE",
|
||||
"SDIFF",
|
||||
"SDIFFSTORE",
|
||||
"SRANDMEMBER",
|
||||
"ZADD",
|
||||
"ZRANGE",
|
||||
"ZREM",
|
||||
"ZINCRBY",
|
||||
"ZRANK",
|
||||
"ZREVRANK",
|
||||
"ZREVRANGE",
|
||||
"ZCARD",
|
||||
"ZSCORE",
|
||||
"MULTI",
|
||||
"DISCARD",
|
||||
"EXEC",
|
||||
"WATCH",
|
||||
"UNWATCH",
|
||||
"SORT",
|
||||
"BLPOP",
|
||||
"BRPOP",
|
||||
"AUTH",
|
||||
"SUBSCRIBE",
|
||||
"PUBLISH",
|
||||
"UNSUBSCRIBE",
|
||||
"PSUBSCRIBE",
|
||||
"PUNSUBSCRIBE",
|
||||
"PUBSUB",
|
||||
"ZCOUNT",
|
||||
"ZRANGEBYSCORE",
|
||||
"ZREVRANGEBYSCORE",
|
||||
"ZREMRANGEBYRANK",
|
||||
"ZREMRANGEBYSCORE",
|
||||
"ZUNIONSTORE",
|
||||
"ZINTERSTORE",
|
||||
"ZLEXCOUNT",
|
||||
"ZRANGEBYLEX",
|
||||
"ZREVRANGEBYLEX",
|
||||
"ZREMRANGEBYLEX",
|
||||
"SAVE",
|
||||
"BGSAVE",
|
||||
"BGREWRITEAOF",
|
||||
"LASTSAVE",
|
||||
"SHUTDOWN",
|
||||
"INFO",
|
||||
"MONITOR",
|
||||
"SLAVEOF",
|
||||
"CONFIG",
|
||||
"STRLEN",
|
||||
"SYNC",
|
||||
"LPUSHX",
|
||||
"PERSIST",
|
||||
"RPUSHX",
|
||||
"ECHO",
|
||||
"LINSERT",
|
||||
"DEBUG",
|
||||
"BRPOPLPUSH",
|
||||
"SETBIT",
|
||||
"GETBIT",
|
||||
"BITPOS",
|
||||
"SETRANGE",
|
||||
"GETRANGE",
|
||||
"EVAL",
|
||||
"EVALSHA",
|
||||
"SCRIPT",
|
||||
"SLOWLOG",
|
||||
"OBJECT",
|
||||
"BITCOUNT",
|
||||
"BITOP",
|
||||
"SENTINEL",
|
||||
"DUMP",
|
||||
"RESTORE",
|
||||
"PEXPIRE",
|
||||
"PEXPIREAT",
|
||||
"PTTL",
|
||||
"INCRBYFLOAT",
|
||||
"PSETEX",
|
||||
"CLIENT",
|
||||
"TIME",
|
||||
"MIGRATE",
|
||||
"HINCRBYFLOAT",
|
||||
"SCAN",
|
||||
"HSCAN",
|
||||
"SSCAN",
|
||||
"ZSCAN",
|
||||
"WAIT",
|
||||
"CLUSTER",
|
||||
"ASKING",
|
||||
"PFADD",
|
||||
"PFCOUNT",
|
||||
"PFMERGE",
|
||||
"READONLY",
|
||||
"GEOADD",
|
||||
"GEODIST",
|
||||
"GEOHASH",
|
||||
"GEOPOS",
|
||||
"GEORADIUS",
|
||||
"GEORADIUS_RO",
|
||||
"GEORADIUSBYMEMBER",
|
||||
"GEORADIUSBYMEMBER_RO",
|
||||
"MODULE",
|
||||
"BITFIELD",
|
||||
"HSTRLEN",
|
||||
"TOUCH",
|
||||
"SWAPDB",
|
||||
"MEMORY",
|
||||
"XADD",
|
||||
"XLEN",
|
||||
"XDEL",
|
||||
"XTRIM",
|
||||
"XRANGE",
|
||||
"XREVRANGE",
|
||||
"XREAD",
|
||||
"XACK",
|
||||
"XGROUP",
|
||||
"XREADGROUP",
|
||||
"XPENDING",
|
||||
"XCLAIM",
|
||||
}
|
||||
|
||||
var keywords []RedisKeyword = []RedisKeyword{
|
||||
"AGGREGATE",
|
||||
"ALPHA",
|
||||
"ASC",
|
||||
"BY",
|
||||
"DESC",
|
||||
"GET",
|
||||
"LIMIT",
|
||||
"MESSAGE",
|
||||
"NO",
|
||||
"NOSORT",
|
||||
"PMESSAGE",
|
||||
"PSUBSCRIBE",
|
||||
"PUNSUBSCRIBE",
|
||||
"OK",
|
||||
"ONE",
|
||||
"QUEUED",
|
||||
"SET",
|
||||
"STORE",
|
||||
"SUBSCRIBE",
|
||||
"UNSUBSCRIBE",
|
||||
"WEIGHTS",
|
||||
"WITHSCORES",
|
||||
"RESETSTAT",
|
||||
"REWRITE",
|
||||
"RESET",
|
||||
"FLUSH",
|
||||
"EXISTS",
|
||||
"LOAD",
|
||||
"KILL",
|
||||
"LEN",
|
||||
"REFCOUNT",
|
||||
"ENCODING",
|
||||
"IDLETIME",
|
||||
"GETNAME",
|
||||
"SETNAME",
|
||||
"LIST",
|
||||
"MATCH",
|
||||
"COUNT",
|
||||
"PING",
|
||||
"PONG",
|
||||
"UNLOAD",
|
||||
"REPLACE",
|
||||
"KEYS",
|
||||
"PAUSE",
|
||||
"DOCTOR",
|
||||
"BLOCK",
|
||||
"NOACK",
|
||||
"STREAMS",
|
||||
"KEY",
|
||||
"CREATE",
|
||||
"MKSTREAM",
|
||||
"SETID",
|
||||
"DESTROY",
|
||||
"DELCONSUMER",
|
||||
"MAXLEN",
|
||||
"GROUP",
|
||||
"IDLE",
|
||||
"TIME",
|
||||
"RETRYCOUNT",
|
||||
"FORCE",
|
||||
}
|
||||
|
||||
type RedisPacket struct {
|
||||
Type RedisType `json:"type"`
|
||||
Command RedisCommand `json:"command"`
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
Keyword RedisKeyword `json:"keyword"`
|
||||
}
|
||||
|
||||
func isValidRedisCommand(s []RedisCommand, c RedisCommand) bool {
|
||||
for _, v := range s {
|
||||
if v == c {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isValidRedisKeyword(s []RedisKeyword, c RedisKeyword) bool {
|
||||
for _, v := range s {
|
||||
if v == c {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
Reference in New Issue
Block a user