diff --git a/Makefile b/Makefile index 7b6ef392a..9dc129ab8 100644 --- a/Makefile +++ b/Makefile @@ -103,6 +103,7 @@ test-shared: ## Run shared tests test-extensions: ## Run extensions tests @echo "running http tests"; cd tap/extensions/http && $(MAKE) test + @echo "running amqp tests"; cd tap/extensions/amqp && $(MAKE) test acceptance-test: ## Run acceptance tests @echo "running acceptance tests"; cd acceptanceTests && $(MAKE) test diff --git a/tap/extensions/amqp/Makefile b/tap/extensions/amqp/Makefile new file mode 100644 index 000000000..ae0046f7a --- /dev/null +++ b/tap/extensions/amqp/Makefile @@ -0,0 +1,16 @@ +skipbin := $$(find bin -mindepth 1 -maxdepth 1) +skipexpect := $$(find expect -mindepth 1 -maxdepth 1) + +test: test-pull-bin test-pull-expect + @MIZU_TEST=1 go test -v ./... -coverpkg=./... -race -coverprofile=coverage.out -covermode=atomic + +test-update: test-pull-bin + @MIZU_TEST=1 TEST_UPDATE=1 go test -v ./... -coverpkg=./... -coverprofile=coverage.out -covermode=atomic + +test-pull-bin: + @mkdir -p bin + @[ "${skipbin}" ] && echo "Skipping downloading BINs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp gs://static.up9.io/mizu/test-pcap/bin/amqp/\*.bin bin + +test-pull-expect: + @mkdir -p expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/amqp/\* expect diff --git a/tap/extensions/amqp/go.mod b/tap/extensions/amqp/go.mod index f159fffbd..715f96321 100644 --- a/tap/extensions/amqp/go.mod +++ b/tap/extensions/amqp/go.mod @@ -2,8 +2,16 @@ module github.com/up9inc/mizu/tap/extensions/amqp go 1.17 -require github.com/up9inc/mizu/tap/api v0.0.0 +require ( + github.com/stretchr/testify v1.7.0 + github.com/up9inc/mizu/tap/api v0.0.0 +) -require github.com/google/martian v2.1.0+incompatible // indirect +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/google/martian v2.1.0+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api diff --git a/tap/extensions/amqp/go.sum b/tap/extensions/amqp/go.sum index bbcb7d053..53414f82f 100644 --- a/tap/extensions/amqp/go.sum +++ b/tap/extensions/amqp/go.sum @@ -1,2 +1,13 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 3b0a5bd1e..eb5fb0184 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -3,6 +3,7 @@ package amqp import ( "encoding/json" "fmt" + "sort" "strconv" "time" @@ -282,6 +283,9 @@ func representBasicPublish(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, @@ -366,6 +370,9 @@ func representBasicDeliver(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, @@ -438,6 +445,9 @@ func representQueueDeclare(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.arguments["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, @@ -504,6 +514,9 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.arguments["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, @@ -565,6 +578,9 @@ func representConnectionStart(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.serverProperties["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, @@ -656,6 +672,9 @@ func representQueueBind(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.arguments["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, @@ -717,6 +736,9 @@ func representBasicConsume(event map[string]interface{}) []interface{} { Selector: fmt.Sprintf(`request.arguments["%s"]`, name), }) } + sort.Slice(headers, func(i, j int) bool { + return headers[i].Name < headers[j].Name + }) headersMarshaled, _ := json.Marshal(headers) rep = append(rep, api.SectionData{ Type: api.TABLE, diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index f672dba7b..926be7262 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -8,6 +8,7 @@ import ( "io" "log" "strconv" + "time" "github.com/up9inc/mizu/tap/api" ) @@ -85,7 +86,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co frame, err := r.ReadFrame() if err == io.EOF { // We must read until we see an EOF... very important! - return errors.New("AMQP EOF") + return nil } switch f := frame.(type) { @@ -96,6 +97,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co // start content state header = f remaining = int(header.Size) + + // Workaround for `Time.MarshalJSON: year outside of range [0,9999]` error + if header.Properties.Timestamp.Year() > 9999 { + header.Properties.Timestamp = time.Time{}.UTC() + } + switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Properties = header.Properties diff --git a/tap/extensions/amqp/main_test.go b/tap/extensions/amqp/main_test.go new file mode 100644 index 000000000..66897b8e4 --- /dev/null +++ b/tap/extensions/amqp/main_test.go @@ -0,0 +1,289 @@ +package amqp + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/up9inc/mizu/tap/api" +) + +const ( + binDir = "bin" + patternBin = "*_req.bin" + patternDissect = "*.json" + msgDissecting = "Dissecting:" + msgAnalyzing = "Analyzing:" + msgRepresenting = "Representing:" + respSuffix = "_res.bin" + expectDir = "expect" + dissectDir = "dissect" + analyzeDir = "analyze" + representDir = "represent" + testUpdate = "TEST_UPDATE" +) + +func TestRegister(t *testing.T) { + dissector := NewDissector() + extension := &api.Extension{} + dissector.Register(extension) + assert.Equal(t, "amqp", extension.Protocol.Name) +} + +func TestMacros(t *testing.T) { + expectedMacros := map[string]string{ + "amqp": `proto.name == "amqp"`, + } + dissector := NewDissector() + macros := dissector.Macros() + assert.Equal(t, expectedMacros, macros) +} + +func TestPing(t *testing.T) { + dissector := NewDissector() + dissector.Ping() +} + +func TestDissect(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirDissect := path.Join(expectDir, dissectDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirDissect) + err := os.MkdirAll(expectDirDissect, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(binDir, patternBin)) + if err != nil { + log.Fatal(err) + } + + options := &api.TrafficFilteringOptions{ + IgnoredUserAgents: []string{}, + } + + for _, _path := range paths { + basePath := _path[:len(_path)-8] + + // Channel to verify the output + itemChannel := make(chan *api.OutputChannelItem) + var emitter api.Emitter = &api.Emitting{ + AppStats: &api.AppStats{}, + OutputChannel: itemChannel, + } + + var items []*api.OutputChannelItem + stop := make(chan bool) + + go func() { + for { + select { + case <-stop: + return + case item := <-itemChannel: + items = append(items, item) + } + } + }() + + // Stream level + counterPair := &api.CounterPair{ + Request: 0, + Response: 0, + } + superIdentifier := &api.SuperIdentifier{} + + // Request + pathClient := _path + fmt.Printf("%s %s\n", msgDissecting, pathClient) + fileClient, err := os.Open(pathClient) + assert.Nil(t, err) + + bufferClient := bufio.NewReader(fileClient) + tcpIDClient := &api.TcpID{ + SrcIP: "1", + DstIP: "2", + SrcPort: "1", + DstPort: "2", + } + reqResMatcher := dissector.NewResponseRequestMatcher() + err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + panic(err) + } + + // Response + pathServer := basePath + respSuffix + fmt.Printf("%s %s\n", msgDissecting, pathServer) + fileServer, err := os.Open(pathServer) + assert.Nil(t, err) + + bufferServer := bufio.NewReader(fileServer) + tcpIDServer := &api.TcpID{ + SrcIP: "2", + DstIP: "1", + SrcPort: "2", + DstPort: "1", + } + err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + panic(err) + } + + fileClient.Close() + fileServer.Close() + + pathExpect := path.Join(expectDirDissect, fmt.Sprintf("%s.json", basePath[4:])) + + time.Sleep(10 * time.Millisecond) + + stop <- true + + marshaled, err := json.Marshal(items) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(items) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, items, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} + +func TestAnalyze(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirDissect := path.Join(expectDir, dissectDir) + expectDirAnalyze := path.Join(expectDir, analyzeDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirAnalyze) + err := os.MkdirAll(expectDirAnalyze, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect)) + if err != nil { + log.Fatal(err) + } + + for _, _path := range paths { + fmt.Printf("%s %s\n", msgAnalyzing, _path) + + bytes, err := ioutil.ReadFile(_path) + assert.Nil(t, err) + + var items []*api.OutputChannelItem + err = json.Unmarshal(bytes, &items) + assert.Nil(t, err) + + var entries []*api.Entry + for _, item := range items { + entry := dissector.Analyze(item, "", "", "") + entries = append(entries, entry) + } + + pathExpect := path.Join(expectDirAnalyze, filepath.Base(_path)) + + marshaled, err := json.Marshal(entries) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(entries) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, items, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} + +func TestRepresent(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirAnalyze := path.Join(expectDir, analyzeDir) + expectDirRepresent := path.Join(expectDir, representDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirRepresent) + err := os.MkdirAll(expectDirRepresent, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect)) + if err != nil { + log.Fatal(err) + } + + for _, _path := range paths { + fmt.Printf("%s %s\n", msgRepresenting, _path) + + bytes, err := ioutil.ReadFile(_path) + assert.Nil(t, err) + + var entries []*api.Entry + err = json.Unmarshal(bytes, &entries) + assert.Nil(t, err) + + var objects []string + for _, entry := range entries { + object, _, err := dissector.Represent(entry.Request, entry.Response) + assert.Nil(t, err) + objects = append(objects, string(object)) + } + + pathExpect := path.Join(expectDirRepresent, filepath.Base(_path)) + + marshaled, err := json.Marshal(objects) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(objects) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, objects, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} diff --git a/tap/extensions/amqp/read.go b/tap/extensions/amqp/read.go index d85eeb43b..99b0f4e64 100644 --- a/tap/extensions/amqp/read.go +++ b/tap/extensions/amqp/read.go @@ -140,7 +140,7 @@ func readTimestamp(r io.Reader) (v time.Time, err error) { if err = binary.Read(r, binary.BigEndian, &sec); err != nil { return } - return time.Unix(sec, 0), nil + return time.Unix(sec, 0).UTC(), nil } /* diff --git a/tap/extensions/amqp/spec091.go b/tap/extensions/amqp/spec091.go index 4a4af07c6..1dcde581f 100644 --- a/tap/extensions/amqp/spec091.go +++ b/tap/extensions/amqp/spec091.go @@ -49,27 +49,6 @@ const ( BadMethodFrameUnknownClass = 602 ) -func isSoftExceptionCode(code int) bool { - switch code { - case 311: - return true - case 312: - return true - case 313: - return true - case 403: - return true - case 404: - return true - case 405: - return true - case 406: - return true - - } - return false -} - type ConnectionStart struct { VersionMajor byte `json:"versionMajor"` VersionMinor byte `json:"versionMinor"` @@ -78,37 +57,6 @@ type ConnectionStart struct { Locales string `json:"locales"` } -func (msg *ConnectionStart) id() (uint16, uint16) { - return 10, 10 -} - -func (msg *ConnectionStart) wait() bool { - return true -} - -func (msg *ConnectionStart) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.VersionMajor); err != nil { - return - } - if err = binary.Write(w, binary.BigEndian, msg.VersionMinor); err != nil { - return - } - - if err = writeTable(w, msg.ServerProperties); err != nil { - return - } - - if err = writeLongstr(w, msg.Mechanisms); err != nil { - return - } - if err = writeLongstr(w, msg.Locales); err != nil { - return - } - - return -} - func (msg *ConnectionStart) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.VersionMajor); err != nil { @@ -139,35 +87,6 @@ type ConnectionStartOk struct { Locale string } -func (msg *ConnectionStartOk) id() (uint16, uint16) { - return 10, 11 -} - -func (msg *ConnectionStartOk) wait() bool { - return true -} - -func (msg *ConnectionStartOk) write(w io.Writer) (err error) { - - if err = writeTable(w, msg.ClientProperties); err != nil { - return - } - - if err = writeShortstr(w, msg.Mechanism); err != nil { - return - } - - if err = writeLongstr(w, msg.Response); err != nil { - return - } - - if err = writeShortstr(w, msg.Locale); err != nil { - return - } - - return -} - func (msg *ConnectionStartOk) read(r io.Reader) (err error) { if msg.ClientProperties, err = readTable(r); err != nil { @@ -193,23 +112,6 @@ type connectionSecure struct { Challenge string } -func (msg *connectionSecure) id() (uint16, uint16) { - return 10, 20 -} - -func (msg *connectionSecure) wait() bool { - return true -} - -func (msg *connectionSecure) write(w io.Writer) (err error) { - - if err = writeLongstr(w, msg.Challenge); err != nil { - return - } - - return -} - func (msg *connectionSecure) read(r io.Reader) (err error) { if msg.Challenge, err = readLongstr(r); err != nil { @@ -223,23 +125,6 @@ type connectionSecureOk struct { Response string } -func (msg *connectionSecureOk) id() (uint16, uint16) { - return 10, 21 -} - -func (msg *connectionSecureOk) wait() bool { - return true -} - -func (msg *connectionSecureOk) write(w io.Writer) (err error) { - - if err = writeLongstr(w, msg.Response); err != nil { - return - } - - return -} - func (msg *connectionSecureOk) read(r io.Reader) (err error) { if msg.Response, err = readLongstr(r); err != nil { @@ -255,31 +140,6 @@ type connectionTune struct { Heartbeat uint16 } -func (msg *connectionTune) id() (uint16, uint16) { - return 10, 30 -} - -func (msg *connectionTune) wait() bool { - return true -} - -func (msg *connectionTune) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.ChannelMax); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.FrameMax); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.Heartbeat); err != nil { - return - } - - return -} - func (msg *connectionTune) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.ChannelMax); err != nil { @@ -303,31 +163,6 @@ type connectionTuneOk struct { Heartbeat uint16 } -func (msg *connectionTuneOk) id() (uint16, uint16) { - return 10, 31 -} - -func (msg *connectionTuneOk) wait() bool { - return true -} - -func (msg *connectionTuneOk) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.ChannelMax); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.FrameMax); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.Heartbeat); err != nil { - return - } - - return -} - func (msg *connectionTuneOk) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.ChannelMax); err != nil { @@ -351,35 +186,6 @@ type connectionOpen struct { reserved2 bool } -func (msg *connectionOpen) id() (uint16, uint16) { - return 10, 40 -} - -func (msg *connectionOpen) wait() bool { - return true -} - -func (msg *connectionOpen) write(w io.Writer) (err error) { - var bits byte - - if err = writeShortstr(w, msg.VirtualHost); err != nil { - return - } - if err = writeShortstr(w, msg.reserved1); err != nil { - return - } - - if msg.reserved2 { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *connectionOpen) read(r io.Reader) (err error) { var bits byte @@ -402,23 +208,6 @@ type connectionOpenOk struct { reserved1 string } -func (msg *connectionOpenOk) id() (uint16, uint16) { - return 10, 41 -} - -func (msg *connectionOpenOk) wait() bool { - return true -} - -func (msg *connectionOpenOk) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.reserved1); err != nil { - return - } - - return -} - func (msg *connectionOpenOk) read(r io.Reader) (err error) { if msg.reserved1, err = readShortstr(r); err != nil { @@ -435,34 +224,6 @@ type ConnectionClose struct { MethodId uint16 `json:"methodId"` } -func (msg *ConnectionClose) id() (uint16, uint16) { - return 10, 50 -} - -func (msg *ConnectionClose) wait() bool { - return true -} - -func (msg *ConnectionClose) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { - return - } - - if err = writeShortstr(w, msg.ReplyText); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.ClassId); err != nil { - return - } - if err = binary.Write(w, binary.BigEndian, msg.MethodId); err != nil { - return - } - - return -} - func (msg *ConnectionClose) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { @@ -486,19 +247,6 @@ func (msg *ConnectionClose) read(r io.Reader) (err error) { type ConnectionCloseOk struct { } -func (msg *ConnectionCloseOk) id() (uint16, uint16) { - return 10, 51 -} - -func (msg *ConnectionCloseOk) wait() bool { - return true -} - -func (msg *ConnectionCloseOk) write(w io.Writer) (err error) { - - return -} - func (msg *ConnectionCloseOk) read(r io.Reader) (err error) { return @@ -508,23 +256,6 @@ type connectionBlocked struct { Reason string } -func (msg *connectionBlocked) id() (uint16, uint16) { - return 10, 60 -} - -func (msg *connectionBlocked) wait() bool { - return false -} - -func (msg *connectionBlocked) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.Reason); err != nil { - return - } - - return -} - func (msg *connectionBlocked) read(r io.Reader) (err error) { if msg.Reason, err = readShortstr(r); err != nil { @@ -537,19 +268,6 @@ func (msg *connectionBlocked) read(r io.Reader) (err error) { type connectionUnblocked struct { } -func (msg *connectionUnblocked) id() (uint16, uint16) { - return 10, 61 -} - -func (msg *connectionUnblocked) wait() bool { - return false -} - -func (msg *connectionUnblocked) write(w io.Writer) (err error) { - - return -} - func (msg *connectionUnblocked) read(r io.Reader) (err error) { return @@ -559,23 +277,6 @@ type channelOpen struct { reserved1 string } -func (msg *channelOpen) id() (uint16, uint16) { - return 20, 10 -} - -func (msg *channelOpen) wait() bool { - return true -} - -func (msg *channelOpen) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.reserved1); err != nil { - return - } - - return -} - func (msg *channelOpen) read(r io.Reader) (err error) { if msg.reserved1, err = readShortstr(r); err != nil { @@ -589,23 +290,6 @@ type channelOpenOk struct { reserved1 string } -func (msg *channelOpenOk) id() (uint16, uint16) { - return 20, 11 -} - -func (msg *channelOpenOk) wait() bool { - return true -} - -func (msg *channelOpenOk) write(w io.Writer) (err error) { - - if err = writeLongstr(w, msg.reserved1); err != nil { - return - } - - return -} - func (msg *channelOpenOk) read(r io.Reader) (err error) { if msg.reserved1, err = readLongstr(r); err != nil { @@ -619,28 +303,6 @@ type channelFlow struct { Active bool } -func (msg *channelFlow) id() (uint16, uint16) { - return 20, 20 -} - -func (msg *channelFlow) wait() bool { - return true -} - -func (msg *channelFlow) write(w io.Writer) (err error) { - var bits byte - - if msg.Active { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *channelFlow) read(r io.Reader) (err error) { var bits byte @@ -656,28 +318,6 @@ type channelFlowOk struct { Active bool } -func (msg *channelFlowOk) id() (uint16, uint16) { - return 20, 21 -} - -func (msg *channelFlowOk) wait() bool { - return false -} - -func (msg *channelFlowOk) write(w io.Writer) (err error) { - var bits byte - - if msg.Active { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *channelFlowOk) read(r io.Reader) (err error) { var bits byte @@ -696,34 +336,6 @@ type channelClose struct { MethodId uint16 } -func (msg *channelClose) id() (uint16, uint16) { - return 20, 40 -} - -func (msg *channelClose) wait() bool { - return true -} - -func (msg *channelClose) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { - return - } - - if err = writeShortstr(w, msg.ReplyText); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.ClassId); err != nil { - return - } - if err = binary.Write(w, binary.BigEndian, msg.MethodId); err != nil { - return - } - - return -} - func (msg *channelClose) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { @@ -747,19 +359,6 @@ func (msg *channelClose) read(r io.Reader) (err error) { type channelCloseOk struct { } -func (msg *channelCloseOk) id() (uint16, uint16) { - return 20, 41 -} - -func (msg *channelCloseOk) wait() bool { - return true -} - -func (msg *channelCloseOk) write(w io.Writer) (err error) { - - return -} - func (msg *channelCloseOk) read(r io.Reader) (err error) { return @@ -777,59 +376,6 @@ type ExchangeDeclare struct { Arguments Table `json:"arguments"` } -func (msg *ExchangeDeclare) id() (uint16, uint16) { - return 40, 10 -} - -func (msg *ExchangeDeclare) wait() bool { - return true && !msg.NoWait -} - -func (msg *ExchangeDeclare) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.Type); err != nil { - return - } - - if msg.Passive { - bits |= 1 << 0 - } - - if msg.Durable { - bits |= 1 << 1 - } - - if msg.AutoDelete { - bits |= 1 << 2 - } - - if msg.Internal { - bits |= 1 << 3 - } - - if msg.NoWait { - bits |= 1 << 4 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *ExchangeDeclare) read(r io.Reader) (err error) { var bits byte @@ -863,19 +409,6 @@ func (msg *ExchangeDeclare) read(r io.Reader) (err error) { type ExchangeDeclareOk struct { } -func (msg *ExchangeDeclareOk) id() (uint16, uint16) { - return 40, 11 -} - -func (msg *ExchangeDeclareOk) wait() bool { - return true -} - -func (msg *ExchangeDeclareOk) write(w io.Writer) (err error) { - - return -} - func (msg *ExchangeDeclareOk) read(r io.Reader) (err error) { return @@ -888,40 +421,6 @@ type exchangeDelete struct { NoWait bool } -func (msg *exchangeDelete) id() (uint16, uint16) { - return 40, 20 -} - -func (msg *exchangeDelete) wait() bool { - return true && !msg.NoWait -} - -func (msg *exchangeDelete) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - - if msg.IfUnused { - bits |= 1 << 0 - } - - if msg.NoWait { - bits |= 1 << 1 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *exchangeDelete) read(r io.Reader) (err error) { var bits byte @@ -945,19 +444,6 @@ func (msg *exchangeDelete) read(r io.Reader) (err error) { type exchangeDeleteOk struct { } -func (msg *exchangeDeleteOk) id() (uint16, uint16) { - return 40, 21 -} - -func (msg *exchangeDeleteOk) wait() bool { - return true -} - -func (msg *exchangeDeleteOk) write(w io.Writer) (err error) { - - return -} - func (msg *exchangeDeleteOk) read(r io.Reader) (err error) { return @@ -972,46 +458,6 @@ type exchangeBind struct { Arguments Table } -func (msg *exchangeBind) id() (uint16, uint16) { - return 40, 30 -} - -func (msg *exchangeBind) wait() bool { - return true && !msg.NoWait -} - -func (msg *exchangeBind) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Destination); err != nil { - return - } - if err = writeShortstr(w, msg.Source); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - if msg.NoWait { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *exchangeBind) read(r io.Reader) (err error) { var bits byte @@ -1044,19 +490,6 @@ func (msg *exchangeBind) read(r io.Reader) (err error) { type exchangeBindOk struct { } -func (msg *exchangeBindOk) id() (uint16, uint16) { - return 40, 31 -} - -func (msg *exchangeBindOk) wait() bool { - return true -} - -func (msg *exchangeBindOk) write(w io.Writer) (err error) { - - return -} - func (msg *exchangeBindOk) read(r io.Reader) (err error) { return @@ -1071,46 +504,6 @@ type exchangeUnbind struct { Arguments Table } -func (msg *exchangeUnbind) id() (uint16, uint16) { - return 40, 40 -} - -func (msg *exchangeUnbind) wait() bool { - return true && !msg.NoWait -} - -func (msg *exchangeUnbind) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Destination); err != nil { - return - } - if err = writeShortstr(w, msg.Source); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - if msg.NoWait { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *exchangeUnbind) read(r io.Reader) (err error) { var bits byte @@ -1143,19 +536,6 @@ func (msg *exchangeUnbind) read(r io.Reader) (err error) { type exchangeUnbindOk struct { } -func (msg *exchangeUnbindOk) id() (uint16, uint16) { - return 40, 51 -} - -func (msg *exchangeUnbindOk) wait() bool { - return true -} - -func (msg *exchangeUnbindOk) write(w io.Writer) (err error) { - - return -} - func (msg *exchangeUnbindOk) read(r io.Reader) (err error) { return @@ -1172,56 +552,6 @@ type QueueDeclare struct { Arguments Table `json:"arguments"` } -func (msg *QueueDeclare) id() (uint16, uint16) { - return 50, 10 -} - -func (msg *QueueDeclare) wait() bool { - return true && !msg.NoWait -} - -func (msg *QueueDeclare) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - - if msg.Passive { - bits |= 1 << 0 - } - - if msg.Durable { - bits |= 1 << 1 - } - - if msg.Exclusive { - bits |= 1 << 2 - } - - if msg.AutoDelete { - bits |= 1 << 3 - } - - if msg.NoWait { - bits |= 1 << 4 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *QueueDeclare) read(r io.Reader) (err error) { var bits byte @@ -1255,30 +585,6 @@ type QueueDeclareOk struct { ConsumerCount uint32 } -func (msg *QueueDeclareOk) id() (uint16, uint16) { - return 50, 11 -} - -func (msg *QueueDeclareOk) wait() bool { - return true -} - -func (msg *QueueDeclareOk) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { - return - } - if err = binary.Write(w, binary.BigEndian, msg.ConsumerCount); err != nil { - return - } - - return -} - func (msg *QueueDeclareOk) read(r io.Reader) (err error) { if msg.Queue, err = readShortstr(r); err != nil { @@ -1304,46 +610,6 @@ type QueueBind struct { Arguments Table `json:"arguments"` } -func (msg *QueueBind) id() (uint16, uint16) { - return 50, 20 -} - -func (msg *QueueBind) wait() bool { - return true && !msg.NoWait -} - -func (msg *QueueBind) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - if msg.NoWait { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *QueueBind) read(r io.Reader) (err error) { var bits byte @@ -1376,19 +642,6 @@ func (msg *QueueBind) read(r io.Reader) (err error) { type QueueBindOk struct { } -func (msg *QueueBindOk) id() (uint16, uint16) { - return 50, 21 -} - -func (msg *QueueBindOk) wait() bool { - return true -} - -func (msg *QueueBindOk) write(w io.Writer) (err error) { - - return -} - func (msg *QueueBindOk) read(r io.Reader) (err error) { return @@ -1402,37 +655,6 @@ type queueUnbind struct { Arguments Table } -func (msg *queueUnbind) id() (uint16, uint16) { - return 50, 50 -} - -func (msg *queueUnbind) wait() bool { - return true -} - -func (msg *queueUnbind) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *queueUnbind) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { @@ -1459,19 +681,6 @@ func (msg *queueUnbind) read(r io.Reader) (err error) { type queueUnbindOk struct { } -func (msg *queueUnbindOk) id() (uint16, uint16) { - return 50, 51 -} - -func (msg *queueUnbindOk) wait() bool { - return true -} - -func (msg *queueUnbindOk) write(w io.Writer) (err error) { - - return -} - func (msg *queueUnbindOk) read(r io.Reader) (err error) { return @@ -1483,36 +692,6 @@ type queuePurge struct { NoWait bool } -func (msg *queuePurge) id() (uint16, uint16) { - return 50, 30 -} - -func (msg *queuePurge) wait() bool { - return true && !msg.NoWait -} - -func (msg *queuePurge) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - - if msg.NoWait { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *queuePurge) read(r io.Reader) (err error) { var bits byte @@ -1536,23 +715,6 @@ type queuePurgeOk struct { MessageCount uint32 } -func (msg *queuePurgeOk) id() (uint16, uint16) { - return 50, 31 -} - -func (msg *queuePurgeOk) wait() bool { - return true -} - -func (msg *queuePurgeOk) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { - return - } - - return -} - func (msg *queuePurgeOk) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { @@ -1570,44 +732,6 @@ type queueDelete struct { NoWait bool } -func (msg *queueDelete) id() (uint16, uint16) { - return 50, 40 -} - -func (msg *queueDelete) wait() bool { - return true && !msg.NoWait -} - -func (msg *queueDelete) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - - if msg.IfUnused { - bits |= 1 << 0 - } - - if msg.IfEmpty { - bits |= 1 << 1 - } - - if msg.NoWait { - bits |= 1 << 2 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *queueDelete) read(r io.Reader) (err error) { var bits byte @@ -1633,23 +757,6 @@ type queueDeleteOk struct { MessageCount uint32 } -func (msg *queueDeleteOk) id() (uint16, uint16) { - return 50, 41 -} - -func (msg *queueDeleteOk) wait() bool { - return true -} - -func (msg *queueDeleteOk) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { - return - } - - return -} - func (msg *queueDeleteOk) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { @@ -1665,36 +772,6 @@ type basicQos struct { Global bool } -func (msg *basicQos) id() (uint16, uint16) { - return 60, 10 -} - -func (msg *basicQos) wait() bool { - return true -} - -func (msg *basicQos) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.PrefetchSize); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.PrefetchCount); err != nil { - return - } - - if msg.Global { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicQos) read(r io.Reader) (err error) { var bits byte @@ -1717,19 +794,6 @@ func (msg *basicQos) read(r io.Reader) (err error) { type basicQosOk struct { } -func (msg *basicQosOk) id() (uint16, uint16) { - return 60, 11 -} - -func (msg *basicQosOk) wait() bool { - return true -} - -func (msg *basicQosOk) write(w io.Writer) (err error) { - - return -} - func (msg *basicQosOk) read(r io.Reader) (err error) { return @@ -1746,55 +810,6 @@ type BasicConsume struct { Arguments Table `json:"arguments"` } -func (msg *BasicConsume) id() (uint16, uint16) { - return 60, 20 -} - -func (msg *BasicConsume) wait() bool { - return true && !msg.NoWait -} - -func (msg *BasicConsume) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - if err = writeShortstr(w, msg.ConsumerTag); err != nil { - return - } - - if msg.NoLocal { - bits |= 1 << 0 - } - - if msg.NoAck { - bits |= 1 << 1 - } - - if msg.Exclusive { - bits |= 1 << 2 - } - - if msg.NoWait { - bits |= 1 << 3 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeTable(w, msg.Arguments); err != nil { - return - } - - return -} - func (msg *BasicConsume) read(r io.Reader) (err error) { var bits byte @@ -1828,23 +843,6 @@ type BasicConsumeOk struct { ConsumerTag string } -func (msg *BasicConsumeOk) id() (uint16, uint16) { - return 60, 21 -} - -func (msg *BasicConsumeOk) wait() bool { - return true -} - -func (msg *BasicConsumeOk) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.ConsumerTag); err != nil { - return - } - - return -} - func (msg *BasicConsumeOk) read(r io.Reader) (err error) { if msg.ConsumerTag, err = readShortstr(r); err != nil { @@ -1859,32 +857,6 @@ type basicCancel struct { NoWait bool } -func (msg *basicCancel) id() (uint16, uint16) { - return 60, 30 -} - -func (msg *basicCancel) wait() bool { - return true && !msg.NoWait -} - -func (msg *basicCancel) write(w io.Writer) (err error) { - var bits byte - - if err = writeShortstr(w, msg.ConsumerTag); err != nil { - return - } - - if msg.NoWait { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicCancel) read(r io.Reader) (err error) { var bits byte @@ -1904,23 +876,6 @@ type basicCancelOk struct { ConsumerTag string } -func (msg *basicCancelOk) id() (uint16, uint16) { - return 60, 31 -} - -func (msg *basicCancelOk) wait() bool { - return true -} - -func (msg *basicCancelOk) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.ConsumerTag); err != nil { - return - } - - return -} - func (msg *basicCancelOk) read(r io.Reader) (err error) { if msg.ConsumerTag, err = readShortstr(r); err != nil { @@ -1940,14 +895,6 @@ type BasicPublish struct { Body []byte `json:"body"` } -func (msg *BasicPublish) id() (uint16, uint16) { - return 60, 40 -} - -func (msg *BasicPublish) wait() bool { - return false -} - func (msg *BasicPublish) getContent() (Properties, []byte) { return msg.Properties, msg.Body } @@ -1956,35 +903,6 @@ func (msg *BasicPublish) setContent(props Properties, body []byte) { msg.Properties, msg.Body = props, body } -func (msg *BasicPublish) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - if msg.Mandatory { - bits |= 1 << 0 - } - - if msg.Immediate { - bits |= 1 << 1 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *BasicPublish) read(r io.Reader) (err error) { var bits byte @@ -2017,14 +935,6 @@ type basicReturn struct { Body []byte } -func (msg *basicReturn) id() (uint16, uint16) { - return 60, 50 -} - -func (msg *basicReturn) wait() bool { - return false -} - func (msg *basicReturn) getContent() (Properties, []byte) { return msg.Properties, msg.Body } @@ -2033,25 +943,6 @@ func (msg *basicReturn) setContent(props Properties, body []byte) { msg.Properties, msg.Body = props, body } -func (msg *basicReturn) write(w io.Writer) (err error) { - - if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { - return - } - - if err = writeShortstr(w, msg.ReplyText); err != nil { - return - } - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - return -} - func (msg *basicReturn) read(r io.Reader) (err error) { if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { @@ -2081,14 +972,6 @@ type BasicDeliver struct { Body []byte `json:"body"` } -func (msg *BasicDeliver) id() (uint16, uint16) { - return 60, 60 -} - -func (msg *BasicDeliver) wait() bool { - return false -} - func (msg *BasicDeliver) getContent() (Properties, []byte) { return msg.Properties, msg.Body } @@ -2097,35 +980,6 @@ func (msg *BasicDeliver) setContent(props Properties, body []byte) { msg.Properties, msg.Body = props, body } -func (msg *BasicDeliver) write(w io.Writer) (err error) { - var bits byte - - if err = writeShortstr(w, msg.ConsumerTag); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { - return - } - - if msg.Redelivered { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - return -} - func (msg *BasicDeliver) read(r io.Reader) (err error) { var bits byte @@ -2158,36 +1012,6 @@ type basicGet struct { NoAck bool } -func (msg *basicGet) id() (uint16, uint16) { - return 60, 70 -} - -func (msg *basicGet) wait() bool { - return true -} - -func (msg *basicGet) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { - return - } - - if err = writeShortstr(w, msg.Queue); err != nil { - return - } - - if msg.NoAck { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicGet) read(r io.Reader) (err error) { var bits byte @@ -2217,14 +1041,6 @@ type basicGetOk struct { Body []byte } -func (msg *basicGetOk) id() (uint16, uint16) { - return 60, 71 -} - -func (msg *basicGetOk) wait() bool { - return true -} - func (msg *basicGetOk) getContent() (Properties, []byte) { return msg.Properties, msg.Body } @@ -2233,35 +1049,6 @@ func (msg *basicGetOk) setContent(props Properties, body []byte) { msg.Properties, msg.Body = props, body } -func (msg *basicGetOk) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { - return - } - - if msg.Redelivered { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - if err = writeShortstr(w, msg.Exchange); err != nil { - return - } - if err = writeShortstr(w, msg.RoutingKey); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { - return - } - - return -} - func (msg *basicGetOk) read(r io.Reader) (err error) { var bits byte @@ -2292,23 +1079,6 @@ type basicGetEmpty struct { reserved1 string } -func (msg *basicGetEmpty) id() (uint16, uint16) { - return 60, 72 -} - -func (msg *basicGetEmpty) wait() bool { - return true -} - -func (msg *basicGetEmpty) write(w io.Writer) (err error) { - - if err = writeShortstr(w, msg.reserved1); err != nil { - return - } - - return -} - func (msg *basicGetEmpty) read(r io.Reader) (err error) { if msg.reserved1, err = readShortstr(r); err != nil { @@ -2323,32 +1093,6 @@ type basicAck struct { Multiple bool } -func (msg *basicAck) id() (uint16, uint16) { - return 60, 80 -} - -func (msg *basicAck) wait() bool { - return false -} - -func (msg *basicAck) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { - return - } - - if msg.Multiple { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicAck) read(r io.Reader) (err error) { var bits byte @@ -2369,32 +1113,6 @@ type basicReject struct { Requeue bool } -func (msg *basicReject) id() (uint16, uint16) { - return 60, 90 -} - -func (msg *basicReject) wait() bool { - return false -} - -func (msg *basicReject) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { - return - } - - if msg.Requeue { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicReject) read(r io.Reader) (err error) { var bits byte @@ -2414,28 +1132,6 @@ type basicRecoverAsync struct { Requeue bool } -func (msg *basicRecoverAsync) id() (uint16, uint16) { - return 60, 100 -} - -func (msg *basicRecoverAsync) wait() bool { - return false -} - -func (msg *basicRecoverAsync) write(w io.Writer) (err error) { - var bits byte - - if msg.Requeue { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicRecoverAsync) read(r io.Reader) (err error) { var bits byte @@ -2451,28 +1147,6 @@ type basicRecover struct { Requeue bool } -func (msg *basicRecover) id() (uint16, uint16) { - return 60, 110 -} - -func (msg *basicRecover) wait() bool { - return true -} - -func (msg *basicRecover) write(w io.Writer) (err error) { - var bits byte - - if msg.Requeue { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicRecover) read(r io.Reader) (err error) { var bits byte @@ -2487,19 +1161,6 @@ func (msg *basicRecover) read(r io.Reader) (err error) { type basicRecoverOk struct { } -func (msg *basicRecoverOk) id() (uint16, uint16) { - return 60, 111 -} - -func (msg *basicRecoverOk) wait() bool { - return true -} - -func (msg *basicRecoverOk) write(w io.Writer) (err error) { - - return -} - func (msg *basicRecoverOk) read(r io.Reader) (err error) { return @@ -2511,36 +1172,6 @@ type basicNack struct { Requeue bool } -func (msg *basicNack) id() (uint16, uint16) { - return 60, 120 -} - -func (msg *basicNack) wait() bool { - return false -} - -func (msg *basicNack) write(w io.Writer) (err error) { - var bits byte - - if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { - return - } - - if msg.Multiple { - bits |= 1 << 0 - } - - if msg.Requeue { - bits |= 1 << 1 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *basicNack) read(r io.Reader) (err error) { var bits byte @@ -2560,19 +1191,6 @@ func (msg *basicNack) read(r io.Reader) (err error) { type txSelect struct { } -func (msg *txSelect) id() (uint16, uint16) { - return 90, 10 -} - -func (msg *txSelect) wait() bool { - return true -} - -func (msg *txSelect) write(w io.Writer) (err error) { - - return -} - func (msg *txSelect) read(r io.Reader) (err error) { return @@ -2581,19 +1199,6 @@ func (msg *txSelect) read(r io.Reader) (err error) { type txSelectOk struct { } -func (msg *txSelectOk) id() (uint16, uint16) { - return 90, 11 -} - -func (msg *txSelectOk) wait() bool { - return true -} - -func (msg *txSelectOk) write(w io.Writer) (err error) { - - return -} - func (msg *txSelectOk) read(r io.Reader) (err error) { return @@ -2602,19 +1207,6 @@ func (msg *txSelectOk) read(r io.Reader) (err error) { type txCommit struct { } -func (msg *txCommit) id() (uint16, uint16) { - return 90, 20 -} - -func (msg *txCommit) wait() bool { - return true -} - -func (msg *txCommit) write(w io.Writer) (err error) { - - return -} - func (msg *txCommit) read(r io.Reader) (err error) { return @@ -2623,19 +1215,6 @@ func (msg *txCommit) read(r io.Reader) (err error) { type txCommitOk struct { } -func (msg *txCommitOk) id() (uint16, uint16) { - return 90, 21 -} - -func (msg *txCommitOk) wait() bool { - return true -} - -func (msg *txCommitOk) write(w io.Writer) (err error) { - - return -} - func (msg *txCommitOk) read(r io.Reader) (err error) { return @@ -2644,19 +1223,6 @@ func (msg *txCommitOk) read(r io.Reader) (err error) { type txRollback struct { } -func (msg *txRollback) id() (uint16, uint16) { - return 90, 30 -} - -func (msg *txRollback) wait() bool { - return true -} - -func (msg *txRollback) write(w io.Writer) (err error) { - - return -} - func (msg *txRollback) read(r io.Reader) (err error) { return @@ -2665,19 +1231,6 @@ func (msg *txRollback) read(r io.Reader) (err error) { type txRollbackOk struct { } -func (msg *txRollbackOk) id() (uint16, uint16) { - return 90, 31 -} - -func (msg *txRollbackOk) wait() bool { - return true -} - -func (msg *txRollbackOk) write(w io.Writer) (err error) { - - return -} - func (msg *txRollbackOk) read(r io.Reader) (err error) { return @@ -2687,28 +1240,6 @@ type confirmSelect struct { Nowait bool } -func (msg *confirmSelect) id() (uint16, uint16) { - return 85, 10 -} - -func (msg *confirmSelect) wait() bool { - return true -} - -func (msg *confirmSelect) write(w io.Writer) (err error) { - var bits byte - - if msg.Nowait { - bits |= 1 << 0 - } - - if err = binary.Write(w, binary.BigEndian, bits); err != nil { - return - } - - return -} - func (msg *confirmSelect) read(r io.Reader) (err error) { var bits byte @@ -2723,19 +1254,6 @@ func (msg *confirmSelect) read(r io.Reader) (err error) { type confirmSelectOk struct { } -func (msg *confirmSelectOk) id() (uint16, uint16) { - return 85, 11 -} - -func (msg *confirmSelectOk) wait() bool { - return true -} - -func (msg *confirmSelectOk) write(w io.Writer) (err error) { - - return -} - func (msg *confirmSelectOk) read(r io.Reader) (err error) { return diff --git a/tap/extensions/amqp/types.go b/tap/extensions/amqp/types.go index 5252bc942..bcf321677 100644 --- a/tap/extensions/amqp/types.go +++ b/tap/extensions/amqp/types.go @@ -223,41 +223,8 @@ type Decimal struct { // type Table map[string]interface{} -func validateField(f interface{}) error { - switch fv := f.(type) { - case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: - return nil - - case []interface{}: - for _, v := range fv { - if err := validateField(v); err != nil { - return fmt.Errorf("in array %s", err) - } - } - return nil - - case Table: - for k, v := range fv { - if err := validateField(v); err != nil { - return fmt.Errorf("table field %q %s", k, err) - } - } - return nil - } - - return fmt.Errorf("value %T not supported", f) -} - -// Validate returns and error if any Go types in the table are incompatible with AMQP types. -func (t Table) Validate() error { - return validateField(t) -} - type Message interface { - id() (uint16, uint16) - wait() bool read(io.Reader) error - write(io.Writer) error } /* @@ -286,8 +253,6 @@ system calls to read a frame. */ type frame interface { - write(io.Writer) error - channel() uint16 } type AmqpReader struct { @@ -323,8 +288,6 @@ type MethodFrame struct { Method Message } -func (f *MethodFrame) channel() uint16 { return f.ChannelId } - /* Heartbeating is a technique designed to undo one of TCP/IP's features, namely its ability to recover from a broken physical connection by closing only after @@ -338,8 +301,6 @@ type HeartbeatFrame struct { ChannelId uint16 } -func (f *HeartbeatFrame) channel() uint16 { return f.ChannelId } - /* Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally defined as carrying content. When a peer sends such a method frame, it always @@ -367,8 +328,6 @@ type HeaderFrame struct { Properties Properties } -func (f *HeaderFrame) channel() uint16 { return f.ChannelId } - /* Content is the application data we carry from client-to-client via the AMQP server. Content is, roughly speaking, a set of properties plus a binary data @@ -388,5 +347,3 @@ type BodyFrame struct { ChannelId uint16 Body []byte } - -func (f *BodyFrame) channel() uint16 { return f.ChannelId } diff --git a/tap/extensions/amqp/write.go b/tap/extensions/amqp/write.go deleted file mode 100644 index ff5aac939..000000000 --- a/tap/extensions/amqp/write.go +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp - -package amqp - -import ( - "bytes" - "encoding/binary" - "errors" - "io" - "math" - "time" -) - -func (f *MethodFrame) write(w io.Writer) (err error) { - var payload bytes.Buffer - - if f.Method == nil { - return errors.New("malformed frame: missing method") - } - - class, method := f.Method.id() - - if err = binary.Write(&payload, binary.BigEndian, class); err != nil { - return - } - - if err = binary.Write(&payload, binary.BigEndian, method); err != nil { - return - } - - if err = f.Method.write(&payload); err != nil { - return - } - - return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes()) -} - -// Heartbeat -// -// Payload is empty -func (f *HeartbeatFrame) write(w io.Writer) (err error) { - return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{}) -} - -// CONTENT HEADER -// 0 2 4 12 14 -// +----------+--------+-----------+----------------+------------- - - -// | class-id | weight | body size | property flags | property list... -// +----------+--------+-----------+----------------+------------- - - -// short short long long short remainder... -// -func (f *HeaderFrame) write(w io.Writer) (err error) { - var payload bytes.Buffer - var zeroTime time.Time - - if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { - return - } - - if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil { - return - } - - if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil { - return - } - - // First pass will build the mask to be serialized, second pass will serialize - // each of the fields that appear in the mask. - - var mask uint16 - - if len(f.Properties.ContentType) > 0 { - mask = mask | flagContentType - } - if len(f.Properties.ContentEncoding) > 0 { - mask = mask | flagContentEncoding - } - if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 { - mask = mask | flagHeaders - } - if f.Properties.DeliveryMode > 0 { - mask = mask | flagDeliveryMode - } - if f.Properties.Priority > 0 { - mask = mask | flagPriority - } - if len(f.Properties.CorrelationId) > 0 { - mask = mask | flagCorrelationId - } - if len(f.Properties.ReplyTo) > 0 { - mask = mask | flagReplyTo - } - if len(f.Properties.Expiration) > 0 { - mask = mask | flagExpiration - } - if len(f.Properties.MessageId) > 0 { - mask = mask | flagMessageId - } - if f.Properties.Timestamp != zeroTime { - mask = mask | flagTimestamp - } - if len(f.Properties.Type) > 0 { - mask = mask | flagType - } - if len(f.Properties.UserId) > 0 { - mask = mask | flagUserId - } - if len(f.Properties.AppId) > 0 { - mask = mask | flagAppId - } - - if err = binary.Write(&payload, binary.BigEndian, mask); err != nil { - return - } - - if hasProperty(mask, flagContentType) { - if err = writeShortstr(&payload, f.Properties.ContentType); err != nil { - return - } - } - if hasProperty(mask, flagContentEncoding) { - if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil { - return - } - } - if hasProperty(mask, flagHeaders) { - if err = writeTable(&payload, f.Properties.Headers); err != nil { - return - } - } - if hasProperty(mask, flagDeliveryMode) { - if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil { - return - } - } - if hasProperty(mask, flagPriority) { - if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil { - return - } - } - if hasProperty(mask, flagCorrelationId) { - if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil { - return - } - } - if hasProperty(mask, flagReplyTo) { - if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil { - return - } - } - if hasProperty(mask, flagExpiration) { - if err = writeShortstr(&payload, f.Properties.Expiration); err != nil { - return - } - } - if hasProperty(mask, flagMessageId) { - if err = writeShortstr(&payload, f.Properties.MessageId); err != nil { - return - } - } - if hasProperty(mask, flagTimestamp) { - if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil { - return - } - } - if hasProperty(mask, flagType) { - if err = writeShortstr(&payload, f.Properties.Type); err != nil { - return - } - } - if hasProperty(mask, flagUserId) { - if err = writeShortstr(&payload, f.Properties.UserId); err != nil { - return - } - } - if hasProperty(mask, flagAppId) { - if err = writeShortstr(&payload, f.Properties.AppId); err != nil { - return - } - } - - return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes()) -} - -// Body -// -// Payload is one byterange from the full body who's size is declared in the -// Header frame -func (f *BodyFrame) write(w io.Writer) (err error) { - return writeFrame(w, frameBody, f.ChannelId, f.Body) -} - -func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) { - end := []byte{frameEnd} - size := uint(len(payload)) - - _, err = w.Write([]byte{ - byte(typ), - byte((channel & 0xff00) >> 8), - byte((channel & 0x00ff) >> 0), - byte((size & 0xff000000) >> 24), - byte((size & 0x00ff0000) >> 16), - byte((size & 0x0000ff00) >> 8), - byte((size & 0x000000ff) >> 0), - }) - - if err != nil { - return - } - - if _, err = w.Write(payload); err != nil { - return - } - - if _, err = w.Write(end); err != nil { - return - } - - return -} - -func writeShortstr(w io.Writer, s string) (err error) { - b := []byte(s) - - var length = uint8(len(b)) - - if err = binary.Write(w, binary.BigEndian, length); err != nil { - return - } - - if _, err = w.Write(b[:length]); err != nil { - return - } - - return -} - -func writeLongstr(w io.Writer, s string) (err error) { - b := []byte(s) - - var length = uint32(len(b)) - - if err = binary.Write(w, binary.BigEndian, length); err != nil { - return - } - - if _, err = w.Write(b[:length]); err != nil { - return - } - - return -} - -/* -'A': []interface{} -'D': Decimal -'F': Table -'I': int32 -'S': string -'T': time.Time -'V': nil -'b': byte -'d': float64 -'f': float32 -'l': int64 -'s': int16 -'t': bool -'x': []byte -*/ -func writeField(w io.Writer, value interface{}) (err error) { - var buf [9]byte - var enc []byte - - switch v := value.(type) { - case bool: - buf[0] = 't' - if v { - buf[1] = byte(1) - } else { - buf[1] = byte(0) - } - enc = buf[:2] - - case byte: - buf[0] = 'b' - buf[1] = byte(v) - enc = buf[:2] - - case int16: - buf[0] = 's' - binary.BigEndian.PutUint16(buf[1:3], uint16(v)) - enc = buf[:3] - - case int: - buf[0] = 'I' - binary.BigEndian.PutUint32(buf[1:5], uint32(v)) - enc = buf[:5] - - case int32: - buf[0] = 'I' - binary.BigEndian.PutUint32(buf[1:5], uint32(v)) - enc = buf[:5] - - case int64: - buf[0] = 'l' - binary.BigEndian.PutUint64(buf[1:9], uint64(v)) - enc = buf[:9] - - case float32: - buf[0] = 'f' - binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v)) - enc = buf[:5] - - case float64: - buf[0] = 'd' - binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v)) - enc = buf[:9] - - case Decimal: - buf[0] = 'D' - buf[1] = byte(v.Scale) - binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value)) - enc = buf[:6] - - case string: - buf[0] = 'S' - binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) - enc = append(buf[:5], []byte(v)...) - - case []interface{}: // field-array - buf[0] = 'A' - - sec := new(bytes.Buffer) - for _, val := range v { - if err = writeField(sec, val); err != nil { - return - } - } - - binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len())) - if _, err = w.Write(buf[:5]); err != nil { - return - } - - if _, err = w.Write(sec.Bytes()); err != nil { - return - } - - return - - case time.Time: - buf[0] = 'T' - binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix())) - enc = buf[:9] - - case Table: - if _, err = w.Write([]byte{'F'}); err != nil { - return - } - return writeTable(w, v) - - case []byte: - buf[0] = 'x' - binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) - if _, err = w.Write(buf[0:5]); err != nil { - return - } - if _, err = w.Write(v); err != nil { - return - } - return - - case nil: - buf[0] = 'V' - enc = buf[:1] - - default: - return ErrFieldType - } - - _, err = w.Write(enc) - - return -} - -func writeTable(w io.Writer, table Table) (err error) { - var buf bytes.Buffer - - for key, val := range table { - if err = writeShortstr(&buf, key); err != nil { - return - } - if err = writeField(&buf, val); err != nil { - return - } - } - - return writeLongstr(w, buf.String()) -}