diff --git a/tap/extensions/redis/errors.go b/tap/extensions/redis/errors.go new file mode 100644 index 000000000..25cb3bf54 --- /dev/null +++ b/tap/extensions/redis/errors.go @@ -0,0 +1,14 @@ +package main + +//ConnectError redis connection error,such as io timeout +type ConnectError struct { + Message string +} + +func newConnectError(message string) *ConnectError { + return &ConnectError{Message: message} +} + +func (e *ConnectError) Error() string { + return e.Message +} diff --git a/tap/extensions/redis/go.mod b/tap/extensions/redis/go.mod new file mode 100644 index 000000000..a69f319af --- /dev/null +++ b/tap/extensions/redis/go.mod @@ -0,0 +1,9 @@ +module github.com/up9inc/mizu/tap/extensions/redis + +go 1.16 + +require ( + github.com/up9inc/mizu/tap/api v0.0.0 +) + +replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go new file mode 100644 index 000000000..d135054b1 --- /dev/null +++ b/tap/extensions/redis/handlers.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + + "github.com/up9inc/mizu/tap/api" +) + +func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error { + counterPair.Request++ + ident := fmt.Sprintf( + "%s->%s %s->%s %d", + tcpID.SrcIP, + tcpID.DstIP, + tcpID.SrcPort, + tcpID.DstPort, + counterPair.Request, + ) + item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime) + if item != nil { + item.ConnectionInfo = &api.ConnectionInfo{ + ClientIP: tcpID.SrcIP, + ClientPort: tcpID.SrcPort, + ServerIP: tcpID.DstIP, + ServerPort: tcpID.DstPort, + IsOutgoing: true, + } + emitter.Emit(item) + } + return nil +} + +func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error { + counterPair.Response++ + ident := fmt.Sprintf( + "%s->%s %s->%s %d", + tcpID.DstIP, + tcpID.SrcIP, + tcpID.DstPort, + tcpID.SrcPort, + counterPair.Response, + ) + item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime) + if item != nil { + item.ConnectionInfo = &api.ConnectionInfo{ + ClientIP: tcpID.DstIP, + ClientPort: tcpID.DstPort, + ServerIP: tcpID.SrcIP, + ServerPort: tcpID.SrcPort, + IsOutgoing: false, + } + emitter.Emit(item) + } + return nil +} diff --git a/tap/extensions/redis/helpers.go b/tap/extensions/redis/helpers.go new file mode 100644 index 000000000..bf8b30876 --- /dev/null +++ b/tap/extensions/redis/helpers.go @@ -0,0 +1,57 @@ +package main + +import ( + "encoding/json" + + "github.com/up9inc/mizu/tap/api" +) + +type RedisPayload struct { + Data interface{} +} + +type RedisPayloader interface { + MarshalJSON() ([]byte, error) +} + +func (h RedisPayload) MarshalJSON() ([]byte, error) { + return json.Marshal(h.Data) +} + +type RedisWrapper struct { + Method string `json:"method"` + Url string `json:"url"` + Details interface{} `json:"details"` +} + +func representGeneric(generic map[string]string) (representation []interface{}) { + details, _ := json.Marshal([]map[string]string{ + { + "name": "Type", + "value": generic["type"], + }, + { + "name": "Command", + "value": generic["command"], + }, + { + "name": "Key", + "value": generic["key"], + }, + { + "name": "Value", + "value": generic["value"], + }, + { + "name": "Keyword", + "value": generic["keyword"], + }, + }) + representation = append(representation, map[string]string{ + "type": api.TABLE, + "title": "Details", + "data": string(details), + }) + + return +} diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go new file mode 100644 index 000000000..3202965b3 --- /dev/null +++ b/tap/extensions/redis/main.go @@ -0,0 +1,154 @@ +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "log" + + "github.com/up9inc/mizu/tap/api" +) + +var protocol api.Protocol = api.Protocol{ + Name: "redis", + LongName: "Redis Serialization Protocol", + Abbreviation: "REDIS", + Version: "3.x", + BackgroundColor: "#a41e11", + ForegroundColor: "#ffffff", + FontSize: 11, + ReferenceLink: "https://redis.io/topics/protocol", + Ports: []string{"6379"}, + Priority: 3, +} + +func init() { + log.Println("Initializing Redis extension...") +} + +type dissecting string + +func (d dissecting) Register(extension *api.Extension) { + extension.Protocol = &protocol + extension.MatcherMap = reqResMatcher.openMessagesMap +} + +func (d dissecting) Ping() { + log.Printf("pong %s\n", protocol.Name) +} + +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { + is := &RedisInputStream{ + Reader: b, + Buf: make([]byte, 8192), + } + proto := NewProtocol(is) + for { + redisPacket, err := proto.Read() + if err != nil { + return err + } + + if isClient { + handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket) + } else { + handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket) + } + } +} + +func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { + request := item.Pair.Request.Payload.(map[string]interface{}) + reqDetails := request["details"].(map[string]interface{}) + service := "redis" + if resolvedDestination != "" { + service = resolvedDestination + } else if resolvedSource != "" { + service = resolvedSource + } + + method := "" + if reqDetails["command"] != nil { + method = reqDetails["command"].(string) + } + + summary := "" + if reqDetails["key"] != nil { + summary = reqDetails["key"].(string) + } + + request["url"] = summary + entryBytes, _ := json.Marshal(item.Pair) + return &api.MizuEntry{ + ProtocolName: protocol.Name, + ProtocolLongName: protocol.LongName, + ProtocolAbbreviation: protocol.Abbreviation, + ProtocolVersion: protocol.Version, + ProtocolBackgroundColor: protocol.BackgroundColor, + ProtocolForegroundColor: protocol.ForegroundColor, + ProtocolFontSize: protocol.FontSize, + ProtocolReferenceLink: protocol.ReferenceLink, + EntryId: entryId, + Entry: string(entryBytes), + Url: fmt.Sprintf("%s%s", service, summary), + Method: method, + Status: 0, + RequestSenderIp: item.ConnectionInfo.ClientIP, + Service: service, + Timestamp: item.Timestamp, + ElapsedTime: 0, + Path: summary, + ResolvedSource: resolvedSource, + ResolvedDestination: resolvedDestination, + SourceIp: item.ConnectionInfo.ClientIP, + DestinationIp: item.ConnectionInfo.ServerIP, + SourcePort: item.ConnectionInfo.ClientPort, + DestinationPort: item.ConnectionInfo.ServerPort, + IsOutgoing: item.ConnectionInfo.IsOutgoing, + } + +} + +func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { + return &api.BaseEntryDetails{ + Id: entry.EntryId, + Protocol: protocol, + Url: entry.Url, + RequestSenderIp: entry.RequestSenderIp, + Service: entry.Service, + Summary: entry.Path, + StatusCode: entry.Status, + Method: entry.Method, + Timestamp: entry.Timestamp, + SourceIp: entry.SourceIp, + DestinationIp: entry.DestinationIp, + SourcePort: entry.SourcePort, + DestinationPort: entry.DestinationPort, + IsOutgoing: entry.IsOutgoing, + Latency: entry.ElapsedTime, + Rules: api.ApplicableRules{ + Latency: 0, + Status: false, + }, + } +} + +func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) { + p = protocol + bodySize = 0 + var root map[string]interface{} + json.Unmarshal([]byte(entry.Entry), &root) + representation := make(map[string]interface{}, 0) + request := root["request"].(map[string]interface{})["payload"].(map[string]interface{}) + response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) + reqDetails := request["details"].(map[string]string) + resDetails := response["details"].(map[string]string) + repRequest := representGeneric(reqDetails) + repResponse := representGeneric(resDetails) + representation["request"] = repRequest + representation["response"] = repResponse + object, err = json.Marshal(representation) + return +} + +var Dissector dissecting diff --git a/tap/extensions/redis/matcher.go b/tap/extensions/redis/matcher.go new file mode 100644 index 000000000..71d5770d4 --- /dev/null +++ b/tap/extensions/redis/matcher.go @@ -0,0 +1,102 @@ +package main + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/up9inc/mizu/tap/api" +) + +var reqResMatcher = createResponseRequestMatcher() // global + +// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter} +type requestResponseMatcher struct { + openMessagesMap *sync.Map +} + +func createResponseRequestMatcher() requestResponseMatcher { + newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} + return *newMatcher +} + +func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { + split := splitIdent(ident) + key := genKey(split) + + requestRedisMessage := api.GenericMessage{ + IsRequest: true, + CaptureTime: captureTime, + Payload: RedisPayload{ + Data: &RedisWrapper{ + Method: string(request.Command), + Url: "", + Details: request, + }, + }, + } + + if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { + // Type assertion always succeeds because all of the map's values are of api.GenericMessage type + responseRedisMessage := response.(*api.GenericMessage) + if responseRedisMessage.IsRequest { + return nil + } + return matcher.preparePair(&requestRedisMessage, responseRedisMessage) + } + + matcher.openMessagesMap.Store(key, &requestRedisMessage) + return nil +} + +func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem { + split := splitIdent(ident) + key := genKey(split) + + responseRedisMessage := api.GenericMessage{ + IsRequest: false, + CaptureTime: captureTime, + Payload: RedisPayload{ + Data: &RedisWrapper{ + Method: string(response.Command), + Url: "", + Details: response, + }, + }, + } + + if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { + // Type assertion always succeeds because all of the map's values are of api.GenericMessage type + requestRedisMessage := request.(*api.GenericMessage) + if !requestRedisMessage.IsRequest { + return nil + } + return matcher.preparePair(requestRedisMessage, &responseRedisMessage) + } + + matcher.openMessagesMap.Store(key, &responseRedisMessage) + return nil +} + +func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.GenericMessage, responseRedisMessage *api.GenericMessage) *api.OutputChannelItem { + return &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: requestRedisMessage.CaptureTime.UnixNano() / int64(time.Millisecond), + ConnectionInfo: nil, + Pair: &api.RequestResponsePair{ + Request: *requestRedisMessage, + Response: *responseRedisMessage, + }, + } +} + +func splitIdent(ident string) []string { + ident = strings.Replace(ident, "->", " ", -1) + return strings.Split(ident, " ") +} + +func genKey(split []string) string { + key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) + return key +} diff --git a/tap/extensions/redis/read.go b/tap/extensions/redis/read.go new file mode 100644 index 000000000..93e147303 --- /dev/null +++ b/tap/extensions/redis/read.go @@ -0,0 +1,474 @@ +package main + +import ( + "bufio" + "errors" + "fmt" + "log" + "math" + "reflect" + "strconv" + "strings" + "time" +) + +const ( + askPrefix = "ASK " + movedPrefix = "MOVED " + clusterDownPrefix = "CLUSTERDOWN " + busyPrefix = "BUSY " + noscriptPrefix = "NOSCRIPT " + + defaultHost = "localhost" + defaultPort = 6379 + defaultSentinelPort = 26379 + defaultTimeout = 5 * time.Second + defaultDatabase = 2 * time.Second + + dollarByte = '$' + asteriskByte = '*' + plusByte = '+' + minusByte = '-' + colonByte = ':' + notApplicableByte = '0' + + sentinelMasters = "masters" + sentinelGetMasterAddrByName = "get-master-addr-by-name" + sentinelReset = "reset" + sentinelSlaves = "slaves" + sentinelFailOver = "failover" + sentinelMonitor = "monitor" + sentinelRemove = "remove" + sentinelSet = "set" + + clusterNodes = "nodes" + clusterMeet = "meet" + clusterReset = "reset" + clusterAddSlots = "addslots" + clusterDelSlots = "delslots" + clusterInfo = "info" + clusterGetKeysInSlot = "getkeysinslot" + clusterSetSlot = "setslot" + clusterSetSlotNode = "node" + clusterSetSlotMigrating = "migrating" + clusterSetSlotImporting = "importing" + clusterSetSlotStable = "stable" + clusterForget = "forget" + clusterFlushSlot = "flushslots" + clusterKeySlot = "keyslot" + clusterCountKeyInSlot = "countkeysinslot" + clusterSaveConfig = "saveconfig" + clusterReplicate = "replicate" + clusterSlaves = "slaves" + clusterFailOver = "failover" + clusterSlots = "slots" + pubSubChannels = "channels" + pubSubNumSub = "numsub" + pubSubNumPat = "numpat" +) + +//intToByteArr convert int to byte array +func intToByteArr(a int) []byte { + buf := make([]byte, 0) + return strconv.AppendInt(buf, int64(a), 10) +} + +var ( + bytesTrue = intToByteArr(1) + bytesFalse = intToByteArr(0) + bytesTilde = []byte("~") + + positiveInfinityBytes = []byte("+inf") + negativeInfinityBytes = []byte("-inf") +) + +var ( + sizeTable = []int{9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, + 999999999, math.MaxInt32} + + digitTens = []byte{'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', + '1', '1', '1', '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', + '2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', + '4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6', + '6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8', + '8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9'} + + digitOnes = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', + '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', + '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', + '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', + '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} + + digits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', + 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', + 't', 'u', 'v', 'w', 'x', 'y', 'z'} +) + +// receive message from redis +type RedisInputStream struct { + *bufio.Reader + Buf []byte + count int + limit int +} + +func (r *RedisInputStream) readByte() (byte, error) { + err := r.ensureFill() + if err != nil { + return 0, err + } + ret := r.Buf[r.count] + r.count++ + return ret, nil +} + +func (r *RedisInputStream) ensureFill() error { + if r.count < r.limit { + return nil + } + var err error + r.limit, err = r.Read(r.Buf) + if err != nil { + return newConnectError(err.Error()) + } + r.count = 0 + if r.limit == -1 { + return newConnectError("Unexpected end of stream") + } + return nil +} + +func (r *RedisInputStream) readLine() (string, error) { + buf := "" + for { + err := r.ensureFill() + if err != nil { + return "", err + } + b := r.Buf[r.count] + r.count++ + if b == '\r' { + err := r.ensureFill() + if err != nil { + return "", err + } + c := r.Buf[r.count] + r.count++ + if c == '\n' { + break + } + buf += string(b) + buf += string(c) + } else { + buf += string(b) + } + } + if buf == "" { + return "", newConnectError("It seems like server has closed the connection.") + } + return buf, nil +} + +func (r *RedisInputStream) readLineBytes() ([]byte, error) { + err := r.ensureFill() + if err != nil { + return nil, err + } + pos := r.count + buf := r.Buf + for { + if pos == r.limit { + return r.readLineBytesSlowly() + } + p := buf[pos] + pos++ + if p == '\r' { + if pos == r.limit { + return r.readLineBytesSlowly() + } + p := buf[pos] + pos++ + if p == '\n' { + break + } + } + } + N := pos - r.count - 2 + line := make([]byte, N) + j := 0 + for i := r.count; i <= N; i++ { + line[j] = buf[i] + j++ + } + r.count = pos + return line, nil +} + +func (r *RedisInputStream) readLineBytesSlowly() ([]byte, error) { + buf := make([]byte, 0) + for { + err := r.ensureFill() + if err != nil { + return nil, err + } + b := r.Buf[r.count] + r.count++ + if b == 'r' { + err := r.ensureFill() + if err != nil { + return nil, err + } + c := r.Buf[r.count] + r.count++ + if c == '\n' { + break + } + buf = append(buf, b) + buf = append(buf, c) + } else { + buf = append(buf, b) + } + } + return buf, nil +} + +func (r *RedisInputStream) readIntCrLf() (int64, error) { + err := r.ensureFill() + if err != nil { + return 0, err + } + buf := r.Buf + isNeg := false + if buf[r.count] == '-' { + isNeg = true + } + if isNeg { + r.count++ + } + value := int64(0) + for { + err := r.ensureFill() + if err != nil { + return 0, err + } + b := buf[r.count] + r.count++ + if b == '\r' { + err := r.ensureFill() + if err != nil { + return 0, err + } + c := buf[r.count] + r.count++ + if c != '\n' { + return 0, newConnectError("Unexpected character!") + } + break + } else { + value = value*10 + int64(b) - int64('0') + } + } + if isNeg { + return -value, nil + } + return value, nil +} + +type RedisProtocol struct { + is *RedisInputStream +} + +func NewProtocol(is *RedisInputStream) *RedisProtocol { + return &RedisProtocol{ + is: is, + } +} + +func (p *RedisProtocol) Read() (packet *RedisPacket, err error) { + x, r, err := p.process() + if err != nil { + return + } + packet = &RedisPacket{} + packet.Type = r + + switch x.(type) { + case []interface{}: + array := x.([]interface{}) + packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8)))) + if len(array) > 1 { + packet.Key = string(array[1].([]uint8)) + } + if len(array) > 2 { + packet.Value = string(array[2].([]uint8)) + } + case []uint8: + val := string(x.([]uint8)) + if packet.Type == types[plusByte] { + packet.Keyword = RedisKeyword(strings.ToUpper(val)) + if !isValidRedisKeyword(keywords, packet.Keyword) { + err = errors.New(fmt.Sprintf("Unrecognized keyword: %s", string(packet.Command))) + return + } + } else { + packet.Value = val + } + case string: + packet.Value = x.(string) + default: + msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x)) + log.Printf(msg) + err = errors.New(msg) + return + } + + if packet.Command != "" { + if !isValidRedisCommand(commands, packet.Command) { + err = errors.New(fmt.Sprintf("Unrecognized command: %s", string(packet.Command))) + return + } + } + + return +} + +func (p *RedisProtocol) process() (v interface{}, r RedisType, err error) { + b, err := p.is.readByte() + if err != nil { + return nil, types[notApplicableByte], newConnectError(err.Error()) + } + switch b { + case plusByte: + v, err = p.processSimpleString() + r = types[plusByte] + return + case dollarByte: + v, err = p.processBulkString() + r = types[dollarByte] + return + case asteriskByte: + v, err = p.processArray() + r = types[asteriskByte] + return + case colonByte: + v, err = p.processInteger() + r = types[colonByte] + return + case minusByte: + v, err = p.processError() + r = types[minusByte] + return + default: + return nil, types[notApplicableByte], newConnectError(fmt.Sprintf("Unknown reply: %b", b)) + } +} + +func (p *RedisProtocol) processSimpleString() ([]byte, error) { + return p.is.readLineBytes() +} + +func (p *RedisProtocol) processBulkString() ([]byte, error) { + l, err := p.is.readIntCrLf() + if err != nil { + return nil, newConnectError(err.Error()) + } + if l == -1 { + return nil, nil + } + line := make([]byte, 0) + for { + err := p.is.ensureFill() + if err != nil { + return nil, err + } + b := p.is.Buf[p.is.count] + p.is.count++ + if b == '\r' { + err := p.is.ensureFill() + if err != nil { + return nil, err + } + c := p.is.Buf[p.is.count] + p.is.count++ + if c != '\n' { + return nil, newConnectError("Unexpected character!") + } + break + } else { + line = append(line, b) + } + } + return line, nil +} + +func (p *RedisProtocol) processArray() ([]interface{}, error) { + l, err := p.is.readIntCrLf() + if err != nil { + return nil, newConnectError(err.Error()) + } + if l == -1 { + return nil, nil + } + ret := make([]interface{}, 0) + for i := 0; i < int(l); i++ { + if obj, _, err := p.process(); err != nil { + ret = append(ret, err) + } else { + ret = append(ret, obj) + } + } + return ret, nil +} + +func (p *RedisProtocol) processInteger() (int64, error) { + return p.is.readIntCrLf() +} + +func (p *RedisProtocol) processError() (interface{}, error) { + msg, err := p.is.readLine() + if err != nil { + return nil, newConnectError(err.Error()) + } + if strings.HasPrefix(msg, movedPrefix) { + host, port, slot, err := p.parseTargetHostAndSlot(msg) + if err != nil { + return nil, err + } + return fmt.Sprintf("MovedDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil + } else if strings.HasPrefix(msg, askPrefix) { + host, port, slot, err := p.parseTargetHostAndSlot(msg) + if err != nil { + return nil, err + } + return fmt.Sprintf("AskDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil + } else if strings.HasPrefix(msg, clusterDownPrefix) { + return fmt.Sprintf("ClusterError: %s", msg), nil + } else if strings.HasPrefix(msg, busyPrefix) { + return fmt.Sprintf("BusyError: %s", msg), nil + } else if strings.HasPrefix(msg, noscriptPrefix) { + return fmt.Sprintf("NoScriptError: %s", msg), nil + } + return fmt.Sprintf("DataError: %s", msg), nil +} + +func (p *RedisProtocol) parseTargetHostAndSlot(clusterRedirectResponse string) (host string, po int, slot int, err error) { + arr := strings.Split(clusterRedirectResponse, " ") + host, port := p.extractParts(arr[2]) + slot, err = strconv.Atoi(arr[1]) + po, err = strconv.Atoi(port) + return +} + +func (p *RedisProtocol) extractParts(from string) (string, string) { + idx := strings.LastIndex(from, ":") + host := from + if idx != -1 { + host = from[0:idx] + } + port := "" + if idx != -1 { + port = from[idx+1:] + } + return host, port +} diff --git a/tap/extensions/redis/structs.go b/tap/extensions/redis/structs.go new file mode 100644 index 000000000..921c1f029 --- /dev/null +++ b/tap/extensions/redis/structs.go @@ -0,0 +1,290 @@ +package main + +type RedisType string +type RedisCommand string +type RedisKeyword string + +var types map[rune]RedisType = map[rune]RedisType{ + plusByte: "Simple String", + dollarByte: "Bulk String", + asteriskByte: "Array", + colonByte: "Integer", + minusByte: "Error", + notApplicableByte: "N/A", +} + +var commands []RedisCommand = []RedisCommand{ + "PING", + "SET", + "GET", + "QUIT", + "EXISTS", + "DEL", + "UNLINK", + "TYPE", + "FLUSHDB", + "KEYS", + "RANDOMKEY", + "RENAME", + "RENAMENX", + "RENAMEX", + "DBSIZE", + "EXPIRE", + "EXPIREAT", + "TTL", + "SELECT", + "MOVE", + "FLUSHALL", + "GETSET", + "MGET", + "SETNX", + "SETEX", + "MSET", + "MSETNX", + "DECRBY", + "DECR", + "INCRBY", + "INCR", + "APPEND", + "SUBSTR", + "HSET", + "HGET", + "HSETNX", + "HMSET", + "HMGET", + "HINCRBY", + "HEXISTS", + "HDEL", + "HLEN", + "HKEYS", + "HVALS", + "HGETALL", + "RPUSH", + "LPUSH", + "LLEN", + "LRANGE", + "LTRIM", + "LINDEX", + "LSET", + "LREM", + "LPOP", + "RPOP", + "RPOPLPUSH", + "SADD", + "SMEMBERS", + "SREM", + "SPOP", + "SMOVE", + "SCARD", + "SISMEMBER", + "SINTER", + "SINTERSTORE", + "SUNION", + "SUNIONSTORE", + "SDIFF", + "SDIFFSTORE", + "SRANDMEMBER", + "ZADD", + "ZRANGE", + "ZREM", + "ZINCRBY", + "ZRANK", + "ZREVRANK", + "ZREVRANGE", + "ZCARD", + "ZSCORE", + "MULTI", + "DISCARD", + "EXEC", + "WATCH", + "UNWATCH", + "SORT", + "BLPOP", + "BRPOP", + "AUTH", + "SUBSCRIBE", + "PUBLISH", + "UNSUBSCRIBE", + "PSUBSCRIBE", + "PUNSUBSCRIBE", + "PUBSUB", + "ZCOUNT", + "ZRANGEBYSCORE", + "ZREVRANGEBYSCORE", + "ZREMRANGEBYRANK", + "ZREMRANGEBYSCORE", + "ZUNIONSTORE", + "ZINTERSTORE", + "ZLEXCOUNT", + "ZRANGEBYLEX", + "ZREVRANGEBYLEX", + "ZREMRANGEBYLEX", + "SAVE", + "BGSAVE", + "BGREWRITEAOF", + "LASTSAVE", + "SHUTDOWN", + "INFO", + "MONITOR", + "SLAVEOF", + "CONFIG", + "STRLEN", + "SYNC", + "LPUSHX", + "PERSIST", + "RPUSHX", + "ECHO", + "LINSERT", + "DEBUG", + "BRPOPLPUSH", + "SETBIT", + "GETBIT", + "BITPOS", + "SETRANGE", + "GETRANGE", + "EVAL", + "EVALSHA", + "SCRIPT", + "SLOWLOG", + "OBJECT", + "BITCOUNT", + "BITOP", + "SENTINEL", + "DUMP", + "RESTORE", + "PEXPIRE", + "PEXPIREAT", + "PTTL", + "INCRBYFLOAT", + "PSETEX", + "CLIENT", + "TIME", + "MIGRATE", + "HINCRBYFLOAT", + "SCAN", + "HSCAN", + "SSCAN", + "ZSCAN", + "WAIT", + "CLUSTER", + "ASKING", + "PFADD", + "PFCOUNT", + "PFMERGE", + "READONLY", + "GEOADD", + "GEODIST", + "GEOHASH", + "GEOPOS", + "GEORADIUS", + "GEORADIUS_RO", + "GEORADIUSBYMEMBER", + "GEORADIUSBYMEMBER_RO", + "MODULE", + "BITFIELD", + "HSTRLEN", + "TOUCH", + "SWAPDB", + "MEMORY", + "XADD", + "XLEN", + "XDEL", + "XTRIM", + "XRANGE", + "XREVRANGE", + "XREAD", + "XACK", + "XGROUP", + "XREADGROUP", + "XPENDING", + "XCLAIM", +} + +var keywords []RedisKeyword = []RedisKeyword{ + "AGGREGATE", + "ALPHA", + "ASC", + "BY", + "DESC", + "GET", + "LIMIT", + "MESSAGE", + "NO", + "NOSORT", + "PMESSAGE", + "PSUBSCRIBE", + "PUNSUBSCRIBE", + "OK", + "ONE", + "QUEUED", + "SET", + "STORE", + "SUBSCRIBE", + "UNSUBSCRIBE", + "WEIGHTS", + "WITHSCORES", + "RESETSTAT", + "REWRITE", + "RESET", + "FLUSH", + "EXISTS", + "LOAD", + "KILL", + "LEN", + "REFCOUNT", + "ENCODING", + "IDLETIME", + "GETNAME", + "SETNAME", + "LIST", + "MATCH", + "COUNT", + "PING", + "PONG", + "UNLOAD", + "REPLACE", + "KEYS", + "PAUSE", + "DOCTOR", + "BLOCK", + "NOACK", + "STREAMS", + "KEY", + "CREATE", + "MKSTREAM", + "SETID", + "DESTROY", + "DELCONSUMER", + "MAXLEN", + "GROUP", + "IDLE", + "TIME", + "RETRYCOUNT", + "FORCE", +} + +type RedisPacket struct { + Type RedisType `json:"type"` + Command RedisCommand `json:"command"` + Key string `json:"key"` + Value string `json:"value"` + Keyword RedisKeyword `json:"keyword"` +} + +func isValidRedisCommand(s []RedisCommand, c RedisCommand) bool { + for _, v := range s { + if v == c { + return true + } + } + return false +} + +func isValidRedisKeyword(s []RedisKeyword, c RedisKeyword) bool { + for _, v := range s { + if v == c { + return true + } + } + return false +}