From 5b3f63dd280e12d6ebe880a286b5a6d8a7fdaa1c Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sat, 21 Aug 2021 10:15:21 +0300 Subject: [PATCH] Bring in the files related AMQP into `amqp` directory --- tap/extensions/amqp/read.go | 456 +++++ tap/extensions/amqp/spec091.go | 3306 ++++++++++++++++++++++++++++++++ tap/extensions/amqp/types.go | 428 +++++ tap/extensions/amqp/write.go | 416 ++++ 4 files changed, 4606 insertions(+) create mode 100644 tap/extensions/amqp/read.go create mode 100644 tap/extensions/amqp/spec091.go create mode 100644 tap/extensions/amqp/types.go create mode 100644 tap/extensions/amqp/write.go diff --git a/tap/extensions/amqp/read.go b/tap/extensions/amqp/read.go new file mode 100644 index 000000000..5457a2c5d --- /dev/null +++ b/tap/extensions/amqp/read.go @@ -0,0 +1,456 @@ +// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// Source code and contact info at http://github.com/streadway/amqp + +package main + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "time" +) + +/* +Reads a frame from an input stream and returns an interface that can be cast into +one of the following: + + MethodFrame + PropertiesFrame + BodyFrame + HeartbeatFrame + +2.3.5 frame Details + +All frames consist of a header (7 octets), a payload of arbitrary size, and a +'frame-end' octet that detects malformed frames: + + 0 1 3 7 size+7 size+8 + +------+---------+-------------+ +------------+ +-----------+ + | type | channel | size | | payload | | frame-end | + +------+---------+-------------+ +------------+ +-----------+ + octet short long size octets octet + +To read a frame, we: + 1. Read the header and check the frame type and channel. + 2. Depending on the frame type, we read the payload and process it. + 3. Read the frame end octet. + +In realistic implementations where performance is a concern, we would use +“read-ahead buffering” or + +“gathering reads” to avoid doing three separate system calls to read a frame. +*/ +func (r *AmqpReader) ReadFrame() (frame frame, err error) { + var scratch [7]byte + + if _, err = io.ReadFull(r.R, scratch[:7]); err != nil { + return + } + + typ := uint8(scratch[0]) + channel := binary.BigEndian.Uint16(scratch[1:3]) + size := binary.BigEndian.Uint32(scratch[3:7]) + + switch typ { + case frameMethod: + if frame, err = r.parseMethodFrame(channel, size); err != nil { + return + } + + case frameHeader: + if frame, err = r.parseHeaderFrame(channel, size); err != nil { + return + } + + case frameBody: + if frame, err = r.parseBodyFrame(channel, size); err != nil { + return nil, err + } + + case frameHeartbeat: + if frame, err = r.parseHeartbeatFrame(channel, size); err != nil { + return + } + + default: + return nil, ErrFrame + } + + if _, err = io.ReadFull(r.R, scratch[:1]); err != nil { + return nil, err + } + + if scratch[0] != frameEnd { + return nil, ErrFrame + } + + return +} + +func readShortstr(r io.Reader) (v string, err error) { + var length uint8 + if err = binary.Read(r, binary.BigEndian, &length); err != nil { + return + } + + bytes := make([]byte, length) + if _, err = io.ReadFull(r, bytes); err != nil { + return + } + return string(bytes), nil +} + +func readLongstr(r io.Reader) (v string, err error) { + var length uint32 + if err = binary.Read(r, binary.BigEndian, &length); err != nil { + return + } + + // slices can't be longer than max int32 value + if length > (^uint32(0) >> 1) { + return + } + + bytes := make([]byte, length) + if _, err = io.ReadFull(r, bytes); err != nil { + return + } + return string(bytes), nil +} + +func readDecimal(r io.Reader) (v Decimal, err error) { + if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil { + return + } + if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil { + return + } + return +} + +func readFloat32(r io.Reader) (v float32, err error) { + if err = binary.Read(r, binary.BigEndian, &v); err != nil { + return + } + return +} + +func readFloat64(r io.Reader) (v float64, err error) { + if err = binary.Read(r, binary.BigEndian, &v); err != nil { + return + } + return +} + +func readTimestamp(r io.Reader) (v time.Time, err error) { + var sec int64 + if err = binary.Read(r, binary.BigEndian, &sec); err != nil { + return + } + return time.Unix(sec, 0), nil +} + +/* +'A': []interface{} +'D': Decimal +'F': Table +'I': int32 +'S': string +'T': time.Time +'V': nil +'b': byte +'d': float64 +'f': float32 +'l': int64 +'s': int16 +'t': bool +'x': []byte +*/ +func readField(r io.Reader) (v interface{}, err error) { + var typ byte + if err = binary.Read(r, binary.BigEndian, &typ); err != nil { + return + } + + switch typ { + case 't': + var value uint8 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return (value != 0), nil + + case 'b': + var value [1]byte + if _, err = io.ReadFull(r, value[0:1]); err != nil { + return + } + return value[0], nil + + case 's': + var value int16 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + + case 'I': + var value int32 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + + case 'l': + var value int64 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + + case 'f': + var value float32 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + + case 'd': + var value float64 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + + case 'D': + return readDecimal(r) + + case 'S': + return readLongstr(r) + + case 'A': + return readArray(r) + + case 'T': + return readTimestamp(r) + + case 'F': + return readTable(r) + + case 'x': + var len int32 + if err = binary.Read(r, binary.BigEndian, &len); err != nil { + return nil, err + } + + value := make([]byte, len) + if _, err = io.ReadFull(r, value); err != nil { + return nil, err + } + return value, err + + case 'V': + return nil, nil + } + + return nil, ErrSyntax +} + +/* + Field tables are long strings that contain packed name-value pairs. The + name-value pairs are encoded as short string defining the name, and octet + defining the values type and then the value itself. The valid field types for + tables are an extension of the native integer, bit, string, and timestamp + types, and are shown in the grammar. Multi-octet integer fields are always + held in network byte order. +*/ +func readTable(r io.Reader) (table Table, err error) { + var nested bytes.Buffer + var str string + + if str, err = readLongstr(r); err != nil { + return + } + + nested.Write([]byte(str)) + + table = make(Table) + + for nested.Len() > 0 { + var key string + var value interface{} + + if key, err = readShortstr(&nested); err != nil { + return + } + + if value, err = readField(&nested); err != nil { + return + } + + table[key] = value + } + + return +} + +func readArray(r io.Reader) ([]interface{}, error) { + var ( + size uint32 + err error + ) + + if err = binary.Read(r, binary.BigEndian, &size); err != nil { + return nil, err + } + + var ( + lim = &io.LimitedReader{R: r, N: int64(size)} + arr = []interface{}{} + field interface{} + ) + + for { + if field, err = readField(lim); err != nil { + if err == io.EOF { + break + } + return nil, err + } + arr = append(arr, field) + } + + return arr, nil +} + +// Checks if this bit mask matches the flags bitset +func hasProperty(mask uint16, prop int) bool { + return int(mask)&prop > 0 +} + +func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) { + hf := &HeaderFrame{ + ChannelId: channel, + } + + if err = binary.Read(r.R, binary.BigEndian, &hf.ClassId); err != nil { + return + } + + if err = binary.Read(r.R, binary.BigEndian, &hf.weight); err != nil { + return + } + + if err = binary.Read(r.R, binary.BigEndian, &hf.Size); err != nil { + return + } + + var flags uint16 + + if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil { + return + } + + if hasProperty(flags, flagContentType) { + if hf.Properties.ContentType, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagContentEncoding) { + if hf.Properties.ContentEncoding, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagHeaders) { + if hf.Properties.Headers, err = readTable(r.R); err != nil { + return + } + } + if hasProperty(flags, flagDeliveryMode) { + if err = binary.Read(r.R, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil { + return + } + } + if hasProperty(flags, flagPriority) { + if err = binary.Read(r.R, binary.BigEndian, &hf.Properties.Priority); err != nil { + return + } + } + if hasProperty(flags, flagCorrelationId) { + if hf.Properties.CorrelationId, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagReplyTo) { + if hf.Properties.ReplyTo, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagExpiration) { + if hf.Properties.Expiration, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagMessageId) { + if hf.Properties.MessageId, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagTimestamp) { + if hf.Properties.Timestamp, err = readTimestamp(r.R); err != nil { + return + } + } + if hasProperty(flags, flagType) { + if hf.Properties.Type, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagUserId) { + if hf.Properties.UserId, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagAppId) { + if hf.Properties.AppId, err = readShortstr(r.R); err != nil { + return + } + } + if hasProperty(flags, flagReserved1) { + if hf.Properties.reserved1, err = readShortstr(r.R); err != nil { + return + } + } + + return hf, nil +} + +func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) { + bf := &BodyFrame{ + ChannelId: channel, + Body: make([]byte, size), + } + + if _, err = io.ReadFull(r.R, bf.Body); err != nil { + return nil, err + } + + return bf, nil +} + +var errHeartbeatPayload = errors.New("Heartbeats should not have a payload") + +func (r *AmqpReader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) { + hf := &HeartbeatFrame{ + ChannelId: channel, + } + + if size > 0 { + return nil, errHeartbeatPayload + } + + return hf, nil +} diff --git a/tap/extensions/amqp/spec091.go b/tap/extensions/amqp/spec091.go new file mode 100644 index 000000000..e9e764081 --- /dev/null +++ b/tap/extensions/amqp/spec091.go @@ -0,0 +1,3306 @@ +// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// Source code and contact info at http://github.com/streadway/amqp + +/* GENERATED FILE - DO NOT EDIT */ +/* Rebuild from the spec/gen.go tool */ + +package main + +import ( + "encoding/binary" + "fmt" + "io" +) + +// Error codes that can be sent from the server during a connection or +// channel exception or used by the client to indicate a class of error like +// ErrCredentials. The text of the error is likely more interesting than +// these constants. +const ( + frameMethod = 1 + frameHeader = 2 + frameBody = 3 + frameHeartbeat = 8 + frameMinSize = 4096 + frameEnd = 206 + replySuccess = 200 + ContentTooLarge = 311 + NoRoute = 312 + NoConsumers = 313 + ConnectionForced = 320 + InvalidPath = 402 + AccessRefused = 403 + NotFound = 404 + ResourceLocked = 405 + PreconditionFailed = 406 + FrameError = 501 + SyntaxError = 502 + CommandInvalid = 503 + ChannelError = 504 + UnexpectedFrame = 505 + ResourceError = 506 + NotAllowed = 530 + NotImplemented = 540 + InternalError = 541 +) + +func isSoftExceptionCode(code int) bool { + switch code { + case 311: + return true + case 312: + return true + case 313: + return true + case 403: + return true + case 404: + return true + case 405: + return true + case 406: + return true + + } + return false +} + +type ConnectionStart struct { + VersionMajor byte + VersionMinor byte + ServerProperties Table + Mechanisms string + Locales string +} + +func (msg *ConnectionStart) id() (uint16, uint16) { + return 10, 10 +} + +func (msg *ConnectionStart) wait() bool { + return true +} + +func (msg *ConnectionStart) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.VersionMajor); err != nil { + return + } + if err = binary.Write(w, binary.BigEndian, msg.VersionMinor); err != nil { + return + } + + if err = writeTable(w, msg.ServerProperties); err != nil { + return + } + + if err = writeLongstr(w, msg.Mechanisms); err != nil { + return + } + if err = writeLongstr(w, msg.Locales); err != nil { + return + } + + return +} + +func (msg *ConnectionStart) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.VersionMajor); err != nil { + return + } + if err = binary.Read(r, binary.BigEndian, &msg.VersionMinor); err != nil { + return + } + + if msg.ServerProperties, err = readTable(r); err != nil { + return + } + + if msg.Mechanisms, err = readLongstr(r); err != nil { + return + } + if msg.Locales, err = readLongstr(r); err != nil { + return + } + + return +} + +type ConnectionStartOk struct { + ClientProperties Table + Mechanism string + Response string + Locale string +} + +func (msg *ConnectionStartOk) id() (uint16, uint16) { + return 10, 11 +} + +func (msg *ConnectionStartOk) wait() bool { + return true +} + +func (msg *ConnectionStartOk) write(w io.Writer) (err error) { + + if err = writeTable(w, msg.ClientProperties); err != nil { + return + } + + if err = writeShortstr(w, msg.Mechanism); err != nil { + return + } + + if err = writeLongstr(w, msg.Response); err != nil { + return + } + + if err = writeShortstr(w, msg.Locale); err != nil { + return + } + + return +} + +func (msg *ConnectionStartOk) read(r io.Reader) (err error) { + + if msg.ClientProperties, err = readTable(r); err != nil { + return + } + + if msg.Mechanism, err = readShortstr(r); err != nil { + return + } + + if msg.Response, err = readLongstr(r); err != nil { + return + } + + if msg.Locale, err = readShortstr(r); err != nil { + return + } + + return +} + +type connectionSecure struct { + Challenge string +} + +func (msg *connectionSecure) id() (uint16, uint16) { + return 10, 20 +} + +func (msg *connectionSecure) wait() bool { + return true +} + +func (msg *connectionSecure) write(w io.Writer) (err error) { + + if err = writeLongstr(w, msg.Challenge); err != nil { + return + } + + return +} + +func (msg *connectionSecure) read(r io.Reader) (err error) { + + if msg.Challenge, err = readLongstr(r); err != nil { + return + } + + return +} + +type connectionSecureOk struct { + Response string +} + +func (msg *connectionSecureOk) id() (uint16, uint16) { + return 10, 21 +} + +func (msg *connectionSecureOk) wait() bool { + return true +} + +func (msg *connectionSecureOk) write(w io.Writer) (err error) { + + if err = writeLongstr(w, msg.Response); err != nil { + return + } + + return +} + +func (msg *connectionSecureOk) read(r io.Reader) (err error) { + + if msg.Response, err = readLongstr(r); err != nil { + return + } + + return +} + +type connectionTune struct { + ChannelMax uint16 + FrameMax uint32 + Heartbeat uint16 +} + +func (msg *connectionTune) id() (uint16, uint16) { + return 10, 30 +} + +func (msg *connectionTune) wait() bool { + return true +} + +func (msg *connectionTune) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.ChannelMax); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.FrameMax); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.Heartbeat); err != nil { + return + } + + return +} + +func (msg *connectionTune) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.ChannelMax); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.FrameMax); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.Heartbeat); err != nil { + return + } + + return +} + +type connectionTuneOk struct { + ChannelMax uint16 + FrameMax uint32 + Heartbeat uint16 +} + +func (msg *connectionTuneOk) id() (uint16, uint16) { + return 10, 31 +} + +func (msg *connectionTuneOk) wait() bool { + return true +} + +func (msg *connectionTuneOk) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.ChannelMax); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.FrameMax); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.Heartbeat); err != nil { + return + } + + return +} + +func (msg *connectionTuneOk) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.ChannelMax); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.FrameMax); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.Heartbeat); err != nil { + return + } + + return +} + +type connectionOpen struct { + VirtualHost string + reserved1 string + reserved2 bool +} + +func (msg *connectionOpen) id() (uint16, uint16) { + return 10, 40 +} + +func (msg *connectionOpen) wait() bool { + return true +} + +func (msg *connectionOpen) write(w io.Writer) (err error) { + var bits byte + + if err = writeShortstr(w, msg.VirtualHost); err != nil { + return + } + if err = writeShortstr(w, msg.reserved1); err != nil { + return + } + + if msg.reserved2 { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *connectionOpen) read(r io.Reader) (err error) { + var bits byte + + if msg.VirtualHost, err = readShortstr(r); err != nil { + return + } + if msg.reserved1, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.reserved2 = (bits&(1<<0) > 0) + + return +} + +type connectionOpenOk struct { + reserved1 string +} + +func (msg *connectionOpenOk) id() (uint16, uint16) { + return 10, 41 +} + +func (msg *connectionOpenOk) wait() bool { + return true +} + +func (msg *connectionOpenOk) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.reserved1); err != nil { + return + } + + return +} + +func (msg *connectionOpenOk) read(r io.Reader) (err error) { + + if msg.reserved1, err = readShortstr(r); err != nil { + return + } + + return +} + +type ConnectionClose struct { + ReplyCode uint16 + ReplyText string + ClassId uint16 + MethodId uint16 +} + +func (msg *ConnectionClose) id() (uint16, uint16) { + return 10, 50 +} + +func (msg *ConnectionClose) wait() bool { + return true +} + +func (msg *ConnectionClose) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { + return + } + + if err = writeShortstr(w, msg.ReplyText); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.ClassId); err != nil { + return + } + if err = binary.Write(w, binary.BigEndian, msg.MethodId); err != nil { + return + } + + return +} + +func (msg *ConnectionClose) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { + return + } + + if msg.ReplyText, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.ClassId); err != nil { + return + } + if err = binary.Read(r, binary.BigEndian, &msg.MethodId); err != nil { + return + } + + return +} + +type ConnectionCloseOk struct { +} + +func (msg *ConnectionCloseOk) id() (uint16, uint16) { + return 10, 51 +} + +func (msg *ConnectionCloseOk) wait() bool { + return true +} + +func (msg *ConnectionCloseOk) write(w io.Writer) (err error) { + + return +} + +func (msg *ConnectionCloseOk) read(r io.Reader) (err error) { + + return +} + +type connectionBlocked struct { + Reason string +} + +func (msg *connectionBlocked) id() (uint16, uint16) { + return 10, 60 +} + +func (msg *connectionBlocked) wait() bool { + return false +} + +func (msg *connectionBlocked) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.Reason); err != nil { + return + } + + return +} + +func (msg *connectionBlocked) read(r io.Reader) (err error) { + + if msg.Reason, err = readShortstr(r); err != nil { + return + } + + return +} + +type connectionUnblocked struct { +} + +func (msg *connectionUnblocked) id() (uint16, uint16) { + return 10, 61 +} + +func (msg *connectionUnblocked) wait() bool { + return false +} + +func (msg *connectionUnblocked) write(w io.Writer) (err error) { + + return +} + +func (msg *connectionUnblocked) read(r io.Reader) (err error) { + + return +} + +type channelOpen struct { + reserved1 string +} + +func (msg *channelOpen) id() (uint16, uint16) { + return 20, 10 +} + +func (msg *channelOpen) wait() bool { + return true +} + +func (msg *channelOpen) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.reserved1); err != nil { + return + } + + return +} + +func (msg *channelOpen) read(r io.Reader) (err error) { + + if msg.reserved1, err = readShortstr(r); err != nil { + return + } + + return +} + +type channelOpenOk struct { + reserved1 string +} + +func (msg *channelOpenOk) id() (uint16, uint16) { + return 20, 11 +} + +func (msg *channelOpenOk) wait() bool { + return true +} + +func (msg *channelOpenOk) write(w io.Writer) (err error) { + + if err = writeLongstr(w, msg.reserved1); err != nil { + return + } + + return +} + +func (msg *channelOpenOk) read(r io.Reader) (err error) { + + if msg.reserved1, err = readLongstr(r); err != nil { + return + } + + return +} + +type channelFlow struct { + Active bool +} + +func (msg *channelFlow) id() (uint16, uint16) { + return 20, 20 +} + +func (msg *channelFlow) wait() bool { + return true +} + +func (msg *channelFlow) write(w io.Writer) (err error) { + var bits byte + + if msg.Active { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *channelFlow) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Active = (bits&(1<<0) > 0) + + return +} + +type channelFlowOk struct { + Active bool +} + +func (msg *channelFlowOk) id() (uint16, uint16) { + return 20, 21 +} + +func (msg *channelFlowOk) wait() bool { + return false +} + +func (msg *channelFlowOk) write(w io.Writer) (err error) { + var bits byte + + if msg.Active { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *channelFlowOk) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Active = (bits&(1<<0) > 0) + + return +} + +type channelClose struct { + ReplyCode uint16 + ReplyText string + ClassId uint16 + MethodId uint16 +} + +func (msg *channelClose) id() (uint16, uint16) { + return 20, 40 +} + +func (msg *channelClose) wait() bool { + return true +} + +func (msg *channelClose) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { + return + } + + if err = writeShortstr(w, msg.ReplyText); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.ClassId); err != nil { + return + } + if err = binary.Write(w, binary.BigEndian, msg.MethodId); err != nil { + return + } + + return +} + +func (msg *channelClose) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { + return + } + + if msg.ReplyText, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.ClassId); err != nil { + return + } + if err = binary.Read(r, binary.BigEndian, &msg.MethodId); err != nil { + return + } + + return +} + +type channelCloseOk struct { +} + +func (msg *channelCloseOk) id() (uint16, uint16) { + return 20, 41 +} + +func (msg *channelCloseOk) wait() bool { + return true +} + +func (msg *channelCloseOk) write(w io.Writer) (err error) { + + return +} + +func (msg *channelCloseOk) read(r io.Reader) (err error) { + + return +} + +type ExchangeDeclare struct { + reserved1 uint16 + Exchange string + Type string + Passive bool + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Arguments Table +} + +func (msg *ExchangeDeclare) id() (uint16, uint16) { + return 40, 10 +} + +func (msg *ExchangeDeclare) wait() bool { + return true && !msg.NoWait +} + +func (msg *ExchangeDeclare) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.Type); err != nil { + return + } + + if msg.Passive { + bits |= 1 << 0 + } + + if msg.Durable { + bits |= 1 << 1 + } + + if msg.AutoDelete { + bits |= 1 << 2 + } + + if msg.Internal { + bits |= 1 << 3 + } + + if msg.NoWait { + bits |= 1 << 4 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *ExchangeDeclare) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.Type, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Passive = (bits&(1<<0) > 0) + msg.Durable = (bits&(1<<1) > 0) + msg.AutoDelete = (bits&(1<<2) > 0) + msg.Internal = (bits&(1<<3) > 0) + msg.NoWait = (bits&(1<<4) > 0) + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type ExchangeDeclareOk struct { +} + +func (msg *ExchangeDeclareOk) id() (uint16, uint16) { + return 40, 11 +} + +func (msg *ExchangeDeclareOk) wait() bool { + return true +} + +func (msg *ExchangeDeclareOk) write(w io.Writer) (err error) { + + return +} + +func (msg *ExchangeDeclareOk) read(r io.Reader) (err error) { + + return +} + +type exchangeDelete struct { + reserved1 uint16 + Exchange string + IfUnused bool + NoWait bool +} + +func (msg *exchangeDelete) id() (uint16, uint16) { + return 40, 20 +} + +func (msg *exchangeDelete) wait() bool { + return true && !msg.NoWait +} + +func (msg *exchangeDelete) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + + if msg.IfUnused { + bits |= 1 << 0 + } + + if msg.NoWait { + bits |= 1 << 1 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *exchangeDelete) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.IfUnused = (bits&(1<<0) > 0) + msg.NoWait = (bits&(1<<1) > 0) + + return +} + +type exchangeDeleteOk struct { +} + +func (msg *exchangeDeleteOk) id() (uint16, uint16) { + return 40, 21 +} + +func (msg *exchangeDeleteOk) wait() bool { + return true +} + +func (msg *exchangeDeleteOk) write(w io.Writer) (err error) { + + return +} + +func (msg *exchangeDeleteOk) read(r io.Reader) (err error) { + + return +} + +type exchangeBind struct { + reserved1 uint16 + Destination string + Source string + RoutingKey string + NoWait bool + Arguments Table +} + +func (msg *exchangeBind) id() (uint16, uint16) { + return 40, 30 +} + +func (msg *exchangeBind) wait() bool { + return true && !msg.NoWait +} + +func (msg *exchangeBind) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Destination); err != nil { + return + } + if err = writeShortstr(w, msg.Source); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + if msg.NoWait { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *exchangeBind) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Destination, err = readShortstr(r); err != nil { + return + } + if msg.Source, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoWait = (bits&(1<<0) > 0) + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type exchangeBindOk struct { +} + +func (msg *exchangeBindOk) id() (uint16, uint16) { + return 40, 31 +} + +func (msg *exchangeBindOk) wait() bool { + return true +} + +func (msg *exchangeBindOk) write(w io.Writer) (err error) { + + return +} + +func (msg *exchangeBindOk) read(r io.Reader) (err error) { + + return +} + +type exchangeUnbind struct { + reserved1 uint16 + Destination string + Source string + RoutingKey string + NoWait bool + Arguments Table +} + +func (msg *exchangeUnbind) id() (uint16, uint16) { + return 40, 40 +} + +func (msg *exchangeUnbind) wait() bool { + return true && !msg.NoWait +} + +func (msg *exchangeUnbind) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Destination); err != nil { + return + } + if err = writeShortstr(w, msg.Source); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + if msg.NoWait { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *exchangeUnbind) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Destination, err = readShortstr(r); err != nil { + return + } + if msg.Source, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoWait = (bits&(1<<0) > 0) + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type exchangeUnbindOk struct { +} + +func (msg *exchangeUnbindOk) id() (uint16, uint16) { + return 40, 51 +} + +func (msg *exchangeUnbindOk) wait() bool { + return true +} + +func (msg *exchangeUnbindOk) write(w io.Writer) (err error) { + + return +} + +func (msg *exchangeUnbindOk) read(r io.Reader) (err error) { + + return +} + +type QueueDeclare struct { + reserved1 uint16 + Queue string + Passive bool + Durable bool + Exclusive bool + AutoDelete bool + NoWait bool + Arguments Table +} + +func (msg *QueueDeclare) id() (uint16, uint16) { + return 50, 10 +} + +func (msg *QueueDeclare) wait() bool { + return true && !msg.NoWait +} + +func (msg *QueueDeclare) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + + if msg.Passive { + bits |= 1 << 0 + } + + if msg.Durable { + bits |= 1 << 1 + } + + if msg.Exclusive { + bits |= 1 << 2 + } + + if msg.AutoDelete { + bits |= 1 << 3 + } + + if msg.NoWait { + bits |= 1 << 4 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *QueueDeclare) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Passive = (bits&(1<<0) > 0) + msg.Durable = (bits&(1<<1) > 0) + msg.Exclusive = (bits&(1<<2) > 0) + msg.AutoDelete = (bits&(1<<3) > 0) + msg.NoWait = (bits&(1<<4) > 0) + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type QueueDeclareOk struct { + Queue string + MessageCount uint32 + ConsumerCount uint32 +} + +func (msg *QueueDeclareOk) id() (uint16, uint16) { + return 50, 11 +} + +func (msg *QueueDeclareOk) wait() bool { + return true +} + +func (msg *QueueDeclareOk) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { + return + } + if err = binary.Write(w, binary.BigEndian, msg.ConsumerCount); err != nil { + return + } + + return +} + +func (msg *QueueDeclareOk) read(r io.Reader) (err error) { + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { + return + } + if err = binary.Read(r, binary.BigEndian, &msg.ConsumerCount); err != nil { + return + } + + return +} + +type QueueBind struct { + reserved1 uint16 + Queue string + Exchange string + RoutingKey string + NoWait bool + Arguments Table +} + +func (msg *QueueBind) id() (uint16, uint16) { + return 50, 20 +} + +func (msg *QueueBind) wait() bool { + return true && !msg.NoWait +} + +func (msg *QueueBind) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + if msg.NoWait { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *QueueBind) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoWait = (bits&(1<<0) > 0) + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type QueueBindOk struct { +} + +func (msg *QueueBindOk) id() (uint16, uint16) { + return 50, 21 +} + +func (msg *QueueBindOk) wait() bool { + return true +} + +func (msg *QueueBindOk) write(w io.Writer) (err error) { + + return +} + +func (msg *QueueBindOk) read(r io.Reader) (err error) { + + return +} + +type queueUnbind struct { + reserved1 uint16 + Queue string + Exchange string + RoutingKey string + Arguments Table +} + +func (msg *queueUnbind) id() (uint16, uint16) { + return 50, 50 +} + +func (msg *queueUnbind) wait() bool { + return true +} + +func (msg *queueUnbind) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *queueUnbind) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type queueUnbindOk struct { +} + +func (msg *queueUnbindOk) id() (uint16, uint16) { + return 50, 51 +} + +func (msg *queueUnbindOk) wait() bool { + return true +} + +func (msg *queueUnbindOk) write(w io.Writer) (err error) { + + return +} + +func (msg *queueUnbindOk) read(r io.Reader) (err error) { + + return +} + +type queuePurge struct { + reserved1 uint16 + Queue string + NoWait bool +} + +func (msg *queuePurge) id() (uint16, uint16) { + return 50, 30 +} + +func (msg *queuePurge) wait() bool { + return true && !msg.NoWait +} + +func (msg *queuePurge) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + + if msg.NoWait { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *queuePurge) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoWait = (bits&(1<<0) > 0) + + return +} + +type queuePurgeOk struct { + MessageCount uint32 +} + +func (msg *queuePurgeOk) id() (uint16, uint16) { + return 50, 31 +} + +func (msg *queuePurgeOk) wait() bool { + return true +} + +func (msg *queuePurgeOk) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { + return + } + + return +} + +func (msg *queuePurgeOk) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { + return + } + + return +} + +type queueDelete struct { + reserved1 uint16 + Queue string + IfUnused bool + IfEmpty bool + NoWait bool +} + +func (msg *queueDelete) id() (uint16, uint16) { + return 50, 40 +} + +func (msg *queueDelete) wait() bool { + return true && !msg.NoWait +} + +func (msg *queueDelete) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + + if msg.IfUnused { + bits |= 1 << 0 + } + + if msg.IfEmpty { + bits |= 1 << 1 + } + + if msg.NoWait { + bits |= 1 << 2 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *queueDelete) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.IfUnused = (bits&(1<<0) > 0) + msg.IfEmpty = (bits&(1<<1) > 0) + msg.NoWait = (bits&(1<<2) > 0) + + return +} + +type queueDeleteOk struct { + MessageCount uint32 +} + +func (msg *queueDeleteOk) id() (uint16, uint16) { + return 50, 41 +} + +func (msg *queueDeleteOk) wait() bool { + return true +} + +func (msg *queueDeleteOk) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { + return + } + + return +} + +func (msg *queueDeleteOk) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { + return + } + + return +} + +type basicQos struct { + PrefetchSize uint32 + PrefetchCount uint16 + Global bool +} + +func (msg *basicQos) id() (uint16, uint16) { + return 60, 10 +} + +func (msg *basicQos) wait() bool { + return true +} + +func (msg *basicQos) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.PrefetchSize); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.PrefetchCount); err != nil { + return + } + + if msg.Global { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicQos) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.PrefetchSize); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.PrefetchCount); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Global = (bits&(1<<0) > 0) + + return +} + +type basicQosOk struct { +} + +func (msg *basicQosOk) id() (uint16, uint16) { + return 60, 11 +} + +func (msg *basicQosOk) wait() bool { + return true +} + +func (msg *basicQosOk) write(w io.Writer) (err error) { + + return +} + +func (msg *basicQosOk) read(r io.Reader) (err error) { + + return +} + +type BasicConsume struct { + reserved1 uint16 + Queue string + ConsumerTag string + NoLocal bool + NoAck bool + Exclusive bool + NoWait bool + Arguments Table +} + +func (msg *BasicConsume) id() (uint16, uint16) { + return 60, 20 +} + +func (msg *BasicConsume) wait() bool { + return true && !msg.NoWait +} + +func (msg *BasicConsume) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + if err = writeShortstr(w, msg.ConsumerTag); err != nil { + return + } + + if msg.NoLocal { + bits |= 1 << 0 + } + + if msg.NoAck { + bits |= 1 << 1 + } + + if msg.Exclusive { + bits |= 1 << 2 + } + + if msg.NoWait { + bits |= 1 << 3 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeTable(w, msg.Arguments); err != nil { + return + } + + return +} + +func (msg *BasicConsume) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + if msg.ConsumerTag, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoLocal = (bits&(1<<0) > 0) + msg.NoAck = (bits&(1<<1) > 0) + msg.Exclusive = (bits&(1<<2) > 0) + msg.NoWait = (bits&(1<<3) > 0) + + if msg.Arguments, err = readTable(r); err != nil { + return + } + + return +} + +type BasicConsumeOk struct { + ConsumerTag string +} + +func (msg *BasicConsumeOk) id() (uint16, uint16) { + return 60, 21 +} + +func (msg *BasicConsumeOk) wait() bool { + return true +} + +func (msg *BasicConsumeOk) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.ConsumerTag); err != nil { + return + } + + return +} + +func (msg *BasicConsumeOk) read(r io.Reader) (err error) { + + if msg.ConsumerTag, err = readShortstr(r); err != nil { + return + } + + return +} + +type basicCancel struct { + ConsumerTag string + NoWait bool +} + +func (msg *basicCancel) id() (uint16, uint16) { + return 60, 30 +} + +func (msg *basicCancel) wait() bool { + return true && !msg.NoWait +} + +func (msg *basicCancel) write(w io.Writer) (err error) { + var bits byte + + if err = writeShortstr(w, msg.ConsumerTag); err != nil { + return + } + + if msg.NoWait { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicCancel) read(r io.Reader) (err error) { + var bits byte + + if msg.ConsumerTag, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoWait = (bits&(1<<0) > 0) + + return +} + +type basicCancelOk struct { + ConsumerTag string +} + +func (msg *basicCancelOk) id() (uint16, uint16) { + return 60, 31 +} + +func (msg *basicCancelOk) wait() bool { + return true +} + +func (msg *basicCancelOk) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.ConsumerTag); err != nil { + return + } + + return +} + +func (msg *basicCancelOk) read(r io.Reader) (err error) { + + if msg.ConsumerTag, err = readShortstr(r); err != nil { + return + } + + return +} + +type BasicPublish struct { + reserved1 uint16 + Exchange string + RoutingKey string + Mandatory bool + Immediate bool + Properties Properties + Body []byte +} + +func (msg *BasicPublish) id() (uint16, uint16) { + return 60, 40 +} + +func (msg *BasicPublish) wait() bool { + return false +} + +func (msg *BasicPublish) getContent() (Properties, []byte) { + return msg.Properties, msg.Body +} + +func (msg *BasicPublish) setContent(props Properties, body []byte) { + msg.Properties, msg.Body = props, body +} + +func (msg *BasicPublish) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + if msg.Mandatory { + bits |= 1 << 0 + } + + if msg.Immediate { + bits |= 1 << 1 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *BasicPublish) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Mandatory = (bits&(1<<0) > 0) + msg.Immediate = (bits&(1<<1) > 0) + + return +} + +type basicReturn struct { + ReplyCode uint16 + ReplyText string + Exchange string + RoutingKey string + Properties Properties + Body []byte +} + +func (msg *basicReturn) id() (uint16, uint16) { + return 60, 50 +} + +func (msg *basicReturn) wait() bool { + return false +} + +func (msg *basicReturn) getContent() (Properties, []byte) { + return msg.Properties, msg.Body +} + +func (msg *basicReturn) setContent(props Properties, body []byte) { + msg.Properties, msg.Body = props, body +} + +func (msg *basicReturn) write(w io.Writer) (err error) { + + if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { + return + } + + if err = writeShortstr(w, msg.ReplyText); err != nil { + return + } + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + return +} + +func (msg *basicReturn) read(r io.Reader) (err error) { + + if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { + return + } + + if msg.ReplyText, err = readShortstr(r); err != nil { + return + } + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + return +} + +type BasicDeliver struct { + ConsumerTag string + DeliveryTag uint64 + Redelivered bool + Exchange string + RoutingKey string + Properties Properties + Body []byte +} + +func (msg *BasicDeliver) id() (uint16, uint16) { + return 60, 60 +} + +func (msg *BasicDeliver) wait() bool { + return false +} + +func (msg *BasicDeliver) getContent() (Properties, []byte) { + return msg.Properties, msg.Body +} + +func (msg *BasicDeliver) setContent(props Properties, body []byte) { + msg.Properties, msg.Body = props, body +} + +func (msg *BasicDeliver) write(w io.Writer) (err error) { + var bits byte + + if err = writeShortstr(w, msg.ConsumerTag); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { + return + } + + if msg.Redelivered { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + return +} + +func (msg *BasicDeliver) read(r io.Reader) (err error) { + var bits byte + + if msg.ConsumerTag, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.DeliveryTag); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Redelivered = (bits&(1<<0) > 0) + + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + return +} + +type basicGet struct { + reserved1 uint16 + Queue string + NoAck bool +} + +func (msg *basicGet) id() (uint16, uint16) { + return 60, 70 +} + +func (msg *basicGet) wait() bool { + return true +} + +func (msg *basicGet) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { + return + } + + if err = writeShortstr(w, msg.Queue); err != nil { + return + } + + if msg.NoAck { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicGet) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { + return + } + + if msg.Queue, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.NoAck = (bits&(1<<0) > 0) + + return +} + +type basicGetOk struct { + DeliveryTag uint64 + Redelivered bool + Exchange string + RoutingKey string + MessageCount uint32 + Properties Properties + Body []byte +} + +func (msg *basicGetOk) id() (uint16, uint16) { + return 60, 71 +} + +func (msg *basicGetOk) wait() bool { + return true +} + +func (msg *basicGetOk) getContent() (Properties, []byte) { + return msg.Properties, msg.Body +} + +func (msg *basicGetOk) setContent(props Properties, body []byte) { + msg.Properties, msg.Body = props, body +} + +func (msg *basicGetOk) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { + return + } + + if msg.Redelivered { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + if err = writeShortstr(w, msg.Exchange); err != nil { + return + } + if err = writeShortstr(w, msg.RoutingKey); err != nil { + return + } + + if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { + return + } + + return +} + +func (msg *basicGetOk) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.DeliveryTag); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Redelivered = (bits&(1<<0) > 0) + + if msg.Exchange, err = readShortstr(r); err != nil { + return + } + if msg.RoutingKey, err = readShortstr(r); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { + return + } + + return +} + +type basicGetEmpty struct { + reserved1 string +} + +func (msg *basicGetEmpty) id() (uint16, uint16) { + return 60, 72 +} + +func (msg *basicGetEmpty) wait() bool { + return true +} + +func (msg *basicGetEmpty) write(w io.Writer) (err error) { + + if err = writeShortstr(w, msg.reserved1); err != nil { + return + } + + return +} + +func (msg *basicGetEmpty) read(r io.Reader) (err error) { + + if msg.reserved1, err = readShortstr(r); err != nil { + return + } + + return +} + +type basicAck struct { + DeliveryTag uint64 + Multiple bool +} + +func (msg *basicAck) id() (uint16, uint16) { + return 60, 80 +} + +func (msg *basicAck) wait() bool { + return false +} + +func (msg *basicAck) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { + return + } + + if msg.Multiple { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicAck) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.DeliveryTag); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Multiple = (bits&(1<<0) > 0) + + return +} + +type basicReject struct { + DeliveryTag uint64 + Requeue bool +} + +func (msg *basicReject) id() (uint16, uint16) { + return 60, 90 +} + +func (msg *basicReject) wait() bool { + return false +} + +func (msg *basicReject) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { + return + } + + if msg.Requeue { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicReject) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.DeliveryTag); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Requeue = (bits&(1<<0) > 0) + + return +} + +type basicRecoverAsync struct { + Requeue bool +} + +func (msg *basicRecoverAsync) id() (uint16, uint16) { + return 60, 100 +} + +func (msg *basicRecoverAsync) wait() bool { + return false +} + +func (msg *basicRecoverAsync) write(w io.Writer) (err error) { + var bits byte + + if msg.Requeue { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicRecoverAsync) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Requeue = (bits&(1<<0) > 0) + + return +} + +type basicRecover struct { + Requeue bool +} + +func (msg *basicRecover) id() (uint16, uint16) { + return 60, 110 +} + +func (msg *basicRecover) wait() bool { + return true +} + +func (msg *basicRecover) write(w io.Writer) (err error) { + var bits byte + + if msg.Requeue { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicRecover) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Requeue = (bits&(1<<0) > 0) + + return +} + +type basicRecoverOk struct { +} + +func (msg *basicRecoverOk) id() (uint16, uint16) { + return 60, 111 +} + +func (msg *basicRecoverOk) wait() bool { + return true +} + +func (msg *basicRecoverOk) write(w io.Writer) (err error) { + + return +} + +func (msg *basicRecoverOk) read(r io.Reader) (err error) { + + return +} + +type basicNack struct { + DeliveryTag uint64 + Multiple bool + Requeue bool +} + +func (msg *basicNack) id() (uint16, uint16) { + return 60, 120 +} + +func (msg *basicNack) wait() bool { + return false +} + +func (msg *basicNack) write(w io.Writer) (err error) { + var bits byte + + if err = binary.Write(w, binary.BigEndian, msg.DeliveryTag); err != nil { + return + } + + if msg.Multiple { + bits |= 1 << 0 + } + + if msg.Requeue { + bits |= 1 << 1 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *basicNack) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &msg.DeliveryTag); err != nil { + return + } + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Multiple = (bits&(1<<0) > 0) + msg.Requeue = (bits&(1<<1) > 0) + + return +} + +type txSelect struct { +} + +func (msg *txSelect) id() (uint16, uint16) { + return 90, 10 +} + +func (msg *txSelect) wait() bool { + return true +} + +func (msg *txSelect) write(w io.Writer) (err error) { + + return +} + +func (msg *txSelect) read(r io.Reader) (err error) { + + return +} + +type txSelectOk struct { +} + +func (msg *txSelectOk) id() (uint16, uint16) { + return 90, 11 +} + +func (msg *txSelectOk) wait() bool { + return true +} + +func (msg *txSelectOk) write(w io.Writer) (err error) { + + return +} + +func (msg *txSelectOk) read(r io.Reader) (err error) { + + return +} + +type txCommit struct { +} + +func (msg *txCommit) id() (uint16, uint16) { + return 90, 20 +} + +func (msg *txCommit) wait() bool { + return true +} + +func (msg *txCommit) write(w io.Writer) (err error) { + + return +} + +func (msg *txCommit) read(r io.Reader) (err error) { + + return +} + +type txCommitOk struct { +} + +func (msg *txCommitOk) id() (uint16, uint16) { + return 90, 21 +} + +func (msg *txCommitOk) wait() bool { + return true +} + +func (msg *txCommitOk) write(w io.Writer) (err error) { + + return +} + +func (msg *txCommitOk) read(r io.Reader) (err error) { + + return +} + +type txRollback struct { +} + +func (msg *txRollback) id() (uint16, uint16) { + return 90, 30 +} + +func (msg *txRollback) wait() bool { + return true +} + +func (msg *txRollback) write(w io.Writer) (err error) { + + return +} + +func (msg *txRollback) read(r io.Reader) (err error) { + + return +} + +type txRollbackOk struct { +} + +func (msg *txRollbackOk) id() (uint16, uint16) { + return 90, 31 +} + +func (msg *txRollbackOk) wait() bool { + return true +} + +func (msg *txRollbackOk) write(w io.Writer) (err error) { + + return +} + +func (msg *txRollbackOk) read(r io.Reader) (err error) { + + return +} + +type confirmSelect struct { + Nowait bool +} + +func (msg *confirmSelect) id() (uint16, uint16) { + return 85, 10 +} + +func (msg *confirmSelect) wait() bool { + return true +} + +func (msg *confirmSelect) write(w io.Writer) (err error) { + var bits byte + + if msg.Nowait { + bits |= 1 << 0 + } + + if err = binary.Write(w, binary.BigEndian, bits); err != nil { + return + } + + return +} + +func (msg *confirmSelect) read(r io.Reader) (err error) { + var bits byte + + if err = binary.Read(r, binary.BigEndian, &bits); err != nil { + return + } + msg.Nowait = (bits&(1<<0) > 0) + + return +} + +type confirmSelectOk struct { +} + +func (msg *confirmSelectOk) id() (uint16, uint16) { + return 85, 11 +} + +func (msg *confirmSelectOk) wait() bool { + return true +} + +func (msg *confirmSelectOk) write(w io.Writer) (err error) { + + return +} + +func (msg *confirmSelectOk) read(r io.Reader) (err error) { + + return +} + +func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err error) { + mf := &MethodFrame{ + ChannelId: channel, + } + + if err = binary.Read(r.R, binary.BigEndian, &mf.ClassId); err != nil { + return + } + + if err = binary.Read(r.R, binary.BigEndian, &mf.MethodId); err != nil { + return + } + + switch mf.ClassId { + + case 10: // connection + switch mf.MethodId { + + case 10: // connection start + //fmt.Println("NextMethod: class:10 method:10") + method := &ConnectionStart{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // connection start-ok + //fmt.Println("NextMethod: class:10 method:11") + method := &ConnectionStartOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 20: // connection secure + //fmt.Println("NextMethod: class:10 method:20") + method := &connectionSecure{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 21: // connection secure-ok + //fmt.Println("NextMethod: class:10 method:21") + method := &connectionSecureOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 30: // connection tune + //fmt.Println("NextMethod: class:10 method:30") + method := &connectionTune{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 31: // connection tune-ok + //fmt.Println("NextMethod: class:10 method:31") + method := &connectionTuneOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 40: // connection open + //fmt.Println("NextMethod: class:10 method:40") + method := &connectionOpen{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 41: // connection open-ok + //fmt.Println("NextMethod: class:10 method:41") + method := &connectionOpenOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 50: // connection close + //fmt.Println("NextMethod: class:10 method:50") + method := &ConnectionClose{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 51: // connection close-ok + //fmt.Println("NextMethod: class:10 method:51") + method := &ConnectionCloseOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 60: // connection blocked + //fmt.Println("NextMethod: class:10 method:60") + method := &connectionBlocked{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 61: // connection unblocked + //fmt.Println("NextMethod: class:10 method:61") + method := &connectionUnblocked{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + case 20: // channel + switch mf.MethodId { + + case 10: // channel open + //fmt.Println("NextMethod: class:20 method:10") + method := &channelOpen{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // channel open-ok + //fmt.Println("NextMethod: class:20 method:11") + method := &channelOpenOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 20: // channel flow + //fmt.Println("NextMethod: class:20 method:20") + method := &channelFlow{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 21: // channel flow-ok + //fmt.Println("NextMethod: class:20 method:21") + method := &channelFlowOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 40: // channel close + //fmt.Println("NextMethod: class:20 method:40") + method := &channelClose{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 41: // channel close-ok + //fmt.Println("NextMethod: class:20 method:41") + method := &channelCloseOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + case 40: // exchange + switch mf.MethodId { + + case 10: // exchange declare + //fmt.Println("NextMethod: class:40 method:10") + method := &ExchangeDeclare{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // exchange declare-ok + //fmt.Println("NextMethod: class:40 method:11") + method := &ExchangeDeclareOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 20: // exchange delete + //fmt.Println("NextMethod: class:40 method:20") + method := &exchangeDelete{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 21: // exchange delete-ok + //fmt.Println("NextMethod: class:40 method:21") + method := &exchangeDeleteOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 30: // exchange bind + //fmt.Println("NextMethod: class:40 method:30") + method := &exchangeBind{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 31: // exchange bind-ok + //fmt.Println("NextMethod: class:40 method:31") + method := &exchangeBindOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 40: // exchange unbind + //fmt.Println("NextMethod: class:40 method:40") + method := &exchangeUnbind{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 51: // exchange unbind-ok + //fmt.Println("NextMethod: class:40 method:51") + method := &exchangeUnbindOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + case 50: // queue + switch mf.MethodId { + + case 10: // queue declare + //fmt.Println("NextMethod: class:50 method:10") + method := &QueueDeclare{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // queue declare-ok + //fmt.Println("NextMethod: class:50 method:11") + method := &QueueDeclareOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 20: // queue bind + //fmt.Println("NextMethod: class:50 method:20") + method := &QueueBind{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 21: // queue bind-ok + //fmt.Println("NextMethod: class:50 method:21") + method := &QueueBindOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 50: // queue unbind + //fmt.Println("NextMethod: class:50 method:50") + method := &queueUnbind{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 51: // queue unbind-ok + //fmt.Println("NextMethod: class:50 method:51") + method := &queueUnbindOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 30: // queue purge + //fmt.Println("NextMethod: class:50 method:30") + method := &queuePurge{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 31: // queue purge-ok + //fmt.Println("NextMethod: class:50 method:31") + method := &queuePurgeOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 40: // queue delete + //fmt.Println("NextMethod: class:50 method:40") + method := &queueDelete{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 41: // queue delete-ok + //fmt.Println("NextMethod: class:50 method:41") + method := &queueDeleteOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + case 60: // basic + switch mf.MethodId { + + case 10: // basic qos + //fmt.Println("NextMethod: class:60 method:10") + method := &basicQos{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // basic qos-ok + //fmt.Println("NextMethod: class:60 method:11") + method := &basicQosOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 20: // basic consume + //fmt.Println("NextMethod: class:60 method:20") + method := &BasicConsume{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 21: // basic consume-ok + //fmt.Println("NextMethod: class:60 method:21") + method := &BasicConsumeOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 30: // basic cancel + //fmt.Println("NextMethod: class:60 method:30") + method := &basicCancel{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 31: // basic cancel-ok + //fmt.Println("NextMethod: class:60 method:31") + method := &basicCancelOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 40: // basic publish + //fmt.Println("NextMethod: class:60 method:40") + method := &BasicPublish{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 50: // basic return + //fmt.Println("NextMethod: class:60 method:50") + method := &basicReturn{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 60: // basic deliver + //fmt.Println("NextMethod: class:60 method:60") + method := &BasicDeliver{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 70: // basic get + //fmt.Println("NextMethod: class:60 method:70") + method := &basicGet{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 71: // basic get-ok + //fmt.Println("NextMethod: class:60 method:71") + method := &basicGetOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 72: // basic get-empty + //fmt.Println("NextMethod: class:60 method:72") + method := &basicGetEmpty{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 80: // basic ack + //fmt.Println("NextMethod: class:60 method:80") + method := &basicAck{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 90: // basic reject + //fmt.Println("NextMethod: class:60 method:90") + method := &basicReject{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 100: // basic recover-async + //fmt.Println("NextMethod: class:60 method:100") + method := &basicRecoverAsync{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 110: // basic recover + //fmt.Println("NextMethod: class:60 method:110") + method := &basicRecover{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 111: // basic recover-ok + //fmt.Println("NextMethod: class:60 method:111") + method := &basicRecoverOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 120: // basic nack + //fmt.Println("NextMethod: class:60 method:120") + method := &basicNack{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + case 90: // tx + switch mf.MethodId { + + case 10: // tx select + //fmt.Println("NextMethod: class:90 method:10") + method := &txSelect{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // tx select-ok + //fmt.Println("NextMethod: class:90 method:11") + method := &txSelectOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 20: // tx commit + //fmt.Println("NextMethod: class:90 method:20") + method := &txCommit{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 21: // tx commit-ok + //fmt.Println("NextMethod: class:90 method:21") + method := &txCommitOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 30: // tx rollback + //fmt.Println("NextMethod: class:90 method:30") + method := &txRollback{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 31: // tx rollback-ok + //fmt.Println("NextMethod: class:90 method:31") + method := &txRollbackOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + case 85: // confirm + switch mf.MethodId { + + case 10: // confirm select + //fmt.Println("NextMethod: class:85 method:10") + method := &confirmSelect{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + case 11: // confirm select-ok + //fmt.Println("NextMethod: class:85 method:11") + method := &confirmSelectOk{} + if err = method.read(r.R); err != nil { + return + } + mf.Method = method + + default: + return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + } + + default: + return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId) + } + + return mf, nil +} diff --git a/tap/extensions/amqp/types.go b/tap/extensions/amqp/types.go new file mode 100644 index 000000000..335ac4c44 --- /dev/null +++ b/tap/extensions/amqp/types.go @@ -0,0 +1,428 @@ +// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// Source code and contact info at http://github.com/streadway/amqp + +package main + +import ( + "fmt" + "io" + "time" +) + +// Constants for standard AMQP 0-9-1 exchange types. +const ( + ExchangeDirect = "direct" + ExchangeFanout = "fanout" + ExchangeTopic = "topic" + ExchangeHeaders = "headers" +) + +var ( + // ErrClosed is returned when the channel or connection is not open + ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"} + + // ErrChannelMax is returned when Connection.Channel has been called enough + // times that all channel IDs have been exhausted in the client or the + // server. + ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"} + + // ErrSASL is returned from Dial when the authentication mechanism could not + // be negoated. + ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"} + + // ErrCredentials is returned when the authenticated client is not authorized + // to any vhost. + ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"} + + // ErrVhost is returned when the authenticated user is not permitted to + // access the requested Vhost. + ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"} + + // ErrSyntax is hard protocol error, indicating an unsupported protocol, + // implementation or encoding. + ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"} + + // ErrFrame is returned when the protocol frame cannot be read from the + // server, indicating an unsupported protocol or unsupported frame type. + ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"} + + // ErrCommandInvalid is returned when the server sends an unexpected response + // to this requested message type. This indicates a bug in this client. + ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"} + + // ErrUnexpectedFrame is returned when something other than a method or + // heartbeat frame is delivered to the Connection, indicating a bug in the + // client. + ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"} + + // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP. + ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} +) + +// Error captures the code and reason a channel or connection has been closed +// by the server. +type Error struct { + Code int // constant code from the specification + Reason string // description of the error + Server bool // true when initiated from the server, false when from this library + Recover bool // true when this error can be recovered by retrying later or with different parameters +} + +func newError(code uint16, text string) *Error { + return &Error{ + Code: int(code), + Reason: text, + Recover: isSoftExceptionCode(int(code)), + Server: true, + } +} + +func (e Error) Error() string { + return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason) +} + +// Used by header frames to capture routing and header information +type Properties struct { + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + Headers Table // Application or header exchange table + DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user id + AppId string // application use - creating application + reserved1 string // was cluster-id - process for buffer consumption +} + +// DeliveryMode. Transient means higher throughput but messages will not be +// restored on broker restart. The delivery mode of publishings is unrelated +// to the durability of the queues they reside on. Transient messages will +// not be restored to durable queues, persistent messages will be restored to +// durable queues and lost on non-durable queues during server restart. +// +// This remains typed as uint8 to match Publishing.DeliveryMode. Other +// delivery modes specific to custom queue implementations are not enumerated +// here. +const ( + Transient uint8 = 1 + Persistent uint8 = 2 +) + +// The property flags are an array of bits that indicate the presence or +// absence of each property value in sequence. The bits are ordered from most +// high to low - bit 15 indicates the first property. +const ( + flagContentType = 0x8000 + flagContentEncoding = 0x4000 + flagHeaders = 0x2000 + flagDeliveryMode = 0x1000 + flagPriority = 0x0800 + flagCorrelationId = 0x0400 + flagReplyTo = 0x0200 + flagExpiration = 0x0100 + flagMessageId = 0x0080 + flagTimestamp = 0x0040 + flagType = 0x0020 + flagUserId = 0x0010 + flagAppId = 0x0008 + flagReserved1 = 0x0004 +) + +// Queue captures the current server state of the queue on the server returned +// from Channel.QueueDeclare or Channel.QueueInspect. +type Queue struct { + Name string // server confirmed or generated name + Messages int // count of messages not awaiting acknowledgment + Consumers int // number of consumers receiving deliveries +} + +// Publishing captures the client message sent to the server. The fields +// outside of the Headers table included in this struct mirror the underlying +// fields in the content frame. They use native types for convenience and +// efficiency. +type Publishing struct { + // Application or exchange specific fields, + // the headers exchange will inspect this field. + Headers Table + + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) + Priority uint8 // 0 to 9 + CorrelationId string // correlation identifier + ReplyTo string // address to to reply to (ex: RPC) + Expiration string // message expiration spec + MessageId string // message identifier + Timestamp time.Time // message timestamp + Type string // message type name + UserId string // creating user id - ex: "guest" + AppId string // creating application id + + // The application specific payload of the message + Body []byte +} + +// Blocking notifies the server's TCP flow control of the Connection. When a +// server hits a memory or disk alarm it will block all connections until the +// resources are reclaimed. Use NotifyBlock on the Connection to receive these +// events. +type Blocking struct { + Active bool // TCP pushback active/inactive on server + Reason string // Server reason for activation +} + +// Confirmation notifies the acknowledgment or negative acknowledgement of a +// publishing identified by its delivery tag. Use NotifyPublish on the Channel +// to consume these events. +type Confirmation struct { + DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode + Ack bool // True when the server successfully received the publishing +} + +// Decimal matches the AMQP decimal type. Scale is the number of decimal +// digits Scale == 2, Value == 12345, Decimal == 123.45 +type Decimal struct { + Scale uint8 + Value int32 +} + +// Table stores user supplied fields of the following types: +// +// bool +// byte +// float32 +// float64 +// int +// int16 +// int32 +// int64 +// nil +// string +// time.Time +// amqp.Decimal +// amqp.Table +// []byte +// []interface{} - containing above types +// +// Functions taking a table will immediately fail when the table contains a +// value of an unsupported type. +// +// The caller must be specific in which precision of integer it wishes to +// encode. +// +// Use a type assertion when reading values from a table for type conversion. +// +// RabbitMQ expects int32 for integer values. +// +type Table map[string]interface{} + +func validateField(f interface{}) error { + switch fv := f.(type) { + case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: + return nil + + case []interface{}: + for _, v := range fv { + if err := validateField(v); err != nil { + return fmt.Errorf("in array %s", err) + } + } + return nil + + case Table: + for k, v := range fv { + if err := validateField(v); err != nil { + return fmt.Errorf("table field %q %s", k, err) + } + } + return nil + } + + return fmt.Errorf("value %T not supported", f) +} + +// Validate returns and error if any Go types in the table are incompatible with AMQP types. +func (t Table) Validate() error { + return validateField(t) +} + +// Heap interface for maintaining delivery tags +type tagSet []uint64 + +func (set tagSet) Len() int { return len(set) } +func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] } +func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] } +func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) } +func (set *tagSet) Pop() interface{} { + val := (*set)[len(*set)-1] + *set = (*set)[:len(*set)-1] + return val +} + +type Message interface { + id() (uint16, uint16) + wait() bool + read(io.Reader) error + write(io.Writer) error +} + +type messageWithContent interface { + Message + getContent() (Properties, []byte) + setContent(Properties, []byte) +} + +/* +The base interface implemented as: + +2.3.5 frame Details + +All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects +malformed frames: + + 0 1 3 7 size+7 size+8 + +------+---------+-------------+ +------------+ +-----------+ + | type | channel | size | | payload | | frame-end | + +------+---------+-------------+ +------------+ +-----------+ + octet short long size octets octet + +To read a frame, we: + + 1. Read the header and check the frame type and channel. + 2. Depending on the frame type, we read the payload and process it. + 3. Read the frame end octet. + +In realistic implementations where performance is a concern, we would use +“read-ahead buffering” or “gathering reads” to avoid doing three separate +system calls to read a frame. + +*/ +type frame interface { + write(io.Writer) error + channel() uint16 +} + +type AmqpReader struct { + R io.Reader +} + +type writer struct { + w io.Writer +} + +// Implements the frame interface for Connection RPC +type protocolHeader struct{} + +func (protocolHeader) write(w io.Writer) error { + _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1}) + return err +} + +func (protocolHeader) channel() uint16 { + panic("only valid as initial handshake") +} + +/* +Method frames carry the high-level protocol commands (which we call "methods"). +One method frame carries one command. The method frame payload has this format: + + 0 2 4 + +----------+-----------+-------------- - - + | class-id | method-id | arguments... + +----------+-----------+-------------- - - + short short ... + +To process a method frame, we: + 1. Read the method frame payload. + 2. Unpack it into a structure. A given method always has the same structure, + so we can unpack the method rapidly. 3. Check that the method is allowed in + the current context. + 4. Check that the method arguments are valid. + 5. Execute the method. + +Method frame bodies are constructed as a list of AMQP data fields (bits, +integers, strings and string tables). The marshalling code is trivially +generated directly from the protocol specifications, and can be very rapid. +*/ +type MethodFrame struct { + ChannelId uint16 + ClassId uint16 + MethodId uint16 + Method Message +} + +func (f *MethodFrame) channel() uint16 { return f.ChannelId } + +/* +Heartbeating is a technique designed to undo one of TCP/IP's features, namely +its ability to recover from a broken physical connection by closing only after +a quite long time-out. In some scenarios we need to know very rapidly if a +peer is disconnected or not responding for other reasons (e.g. it is looping). +Since heartbeating can be done at a low level, we implement this as a special +type of frame that peers exchange at the transport level, rather than as a +class method. +*/ +type HeartbeatFrame struct { + ChannelId uint16 +} + +func (f *HeartbeatFrame) channel() uint16 { return f.ChannelId } + +/* +Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally +defined as carrying content. When a peer sends such a method frame, it always +follows it with a content header and zero or more content body frames. + +A content header frame has this format: + + 0 2 4 12 14 + +----------+--------+-----------+----------------+------------- - - + | class-id | weight | body size | property flags | property list... + +----------+--------+-----------+----------------+------------- - - + short short long long short remainder... + +We place content body in distinct frames (rather than including it in the +method) so that AMQP may support "zero copy" techniques in which content is +never marshalled or encoded. We place the content properties in their own +frame so that recipients can selectively discard contents they do not want to +process +*/ +type HeaderFrame struct { + ChannelId uint16 + ClassId uint16 + weight uint16 + Size uint64 + Properties Properties +} + +func (f *HeaderFrame) channel() uint16 { return f.ChannelId } + +/* +Content is the application data we carry from client-to-client via the AMQP +server. Content is, roughly speaking, a set of properties plus a binary data +part. The set of allowed properties are defined by the Basic class, and these +form the "content header frame". The data can be any size, and MAY be broken +into several (or many) chunks, each forming a "content body frame". + +Looking at the frames for a specific channel, as they pass on the wire, we +might see something like this: + + [method] + [method] [header] [body] [body] + [method] + ... +*/ +type BodyFrame struct { + ChannelId uint16 + Body []byte +} + +func (f *BodyFrame) channel() uint16 { return f.ChannelId } diff --git a/tap/extensions/amqp/write.go b/tap/extensions/amqp/write.go new file mode 100644 index 000000000..89432fdb2 --- /dev/null +++ b/tap/extensions/amqp/write.go @@ -0,0 +1,416 @@ +// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// Source code and contact info at http://github.com/streadway/amqp + +package main + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "io" + "math" + "time" +) + +func (w *writer) WriteFrame(frame frame) (err error) { + if err = frame.write(w.w); err != nil { + return + } + + if buf, ok := w.w.(*bufio.Writer); ok { + err = buf.Flush() + } + + return +} + +func (f *MethodFrame) write(w io.Writer) (err error) { + var payload bytes.Buffer + + if f.Method == nil { + return errors.New("malformed frame: missing method") + } + + class, method := f.Method.id() + + if err = binary.Write(&payload, binary.BigEndian, class); err != nil { + return + } + + if err = binary.Write(&payload, binary.BigEndian, method); err != nil { + return + } + + if err = f.Method.write(&payload); err != nil { + return + } + + return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes()) +} + +// Heartbeat +// +// Payload is empty +func (f *HeartbeatFrame) write(w io.Writer) (err error) { + return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{}) +} + +// CONTENT HEADER +// 0 2 4 12 14 +// +----------+--------+-----------+----------------+------------- - - +// | class-id | weight | body size | property flags | property list... +// +----------+--------+-----------+----------------+------------- - - +// short short long long short remainder... +// +func (f *HeaderFrame) write(w io.Writer) (err error) { + var payload bytes.Buffer + var zeroTime time.Time + + if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { + return + } + + if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil { + return + } + + if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil { + return + } + + // First pass will build the mask to be serialized, second pass will serialize + // each of the fields that appear in the mask. + + var mask uint16 + + if len(f.Properties.ContentType) > 0 { + mask = mask | flagContentType + } + if len(f.Properties.ContentEncoding) > 0 { + mask = mask | flagContentEncoding + } + if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 { + mask = mask | flagHeaders + } + if f.Properties.DeliveryMode > 0 { + mask = mask | flagDeliveryMode + } + if f.Properties.Priority > 0 { + mask = mask | flagPriority + } + if len(f.Properties.CorrelationId) > 0 { + mask = mask | flagCorrelationId + } + if len(f.Properties.ReplyTo) > 0 { + mask = mask | flagReplyTo + } + if len(f.Properties.Expiration) > 0 { + mask = mask | flagExpiration + } + if len(f.Properties.MessageId) > 0 { + mask = mask | flagMessageId + } + if f.Properties.Timestamp != zeroTime { + mask = mask | flagTimestamp + } + if len(f.Properties.Type) > 0 { + mask = mask | flagType + } + if len(f.Properties.UserId) > 0 { + mask = mask | flagUserId + } + if len(f.Properties.AppId) > 0 { + mask = mask | flagAppId + } + + if err = binary.Write(&payload, binary.BigEndian, mask); err != nil { + return + } + + if hasProperty(mask, flagContentType) { + if err = writeShortstr(&payload, f.Properties.ContentType); err != nil { + return + } + } + if hasProperty(mask, flagContentEncoding) { + if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil { + return + } + } + if hasProperty(mask, flagHeaders) { + if err = writeTable(&payload, f.Properties.Headers); err != nil { + return + } + } + if hasProperty(mask, flagDeliveryMode) { + if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil { + return + } + } + if hasProperty(mask, flagPriority) { + if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil { + return + } + } + if hasProperty(mask, flagCorrelationId) { + if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil { + return + } + } + if hasProperty(mask, flagReplyTo) { + if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil { + return + } + } + if hasProperty(mask, flagExpiration) { + if err = writeShortstr(&payload, f.Properties.Expiration); err != nil { + return + } + } + if hasProperty(mask, flagMessageId) { + if err = writeShortstr(&payload, f.Properties.MessageId); err != nil { + return + } + } + if hasProperty(mask, flagTimestamp) { + if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil { + return + } + } + if hasProperty(mask, flagType) { + if err = writeShortstr(&payload, f.Properties.Type); err != nil { + return + } + } + if hasProperty(mask, flagUserId) { + if err = writeShortstr(&payload, f.Properties.UserId); err != nil { + return + } + } + if hasProperty(mask, flagAppId) { + if err = writeShortstr(&payload, f.Properties.AppId); err != nil { + return + } + } + + return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes()) +} + +// Body +// +// Payload is one byterange from the full body who's size is declared in the +// Header frame +func (f *BodyFrame) write(w io.Writer) (err error) { + return writeFrame(w, frameBody, f.ChannelId, f.Body) +} + +func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) { + end := []byte{frameEnd} + size := uint(len(payload)) + + _, err = w.Write([]byte{ + byte(typ), + byte((channel & 0xff00) >> 8), + byte((channel & 0x00ff) >> 0), + byte((size & 0xff000000) >> 24), + byte((size & 0x00ff0000) >> 16), + byte((size & 0x0000ff00) >> 8), + byte((size & 0x000000ff) >> 0), + }) + + if err != nil { + return + } + + if _, err = w.Write(payload); err != nil { + return + } + + if _, err = w.Write(end); err != nil { + return + } + + return +} + +func writeShortstr(w io.Writer, s string) (err error) { + b := []byte(s) + + var length = uint8(len(b)) + + if err = binary.Write(w, binary.BigEndian, length); err != nil { + return + } + + if _, err = w.Write(b[:length]); err != nil { + return + } + + return +} + +func writeLongstr(w io.Writer, s string) (err error) { + b := []byte(s) + + var length = uint32(len(b)) + + if err = binary.Write(w, binary.BigEndian, length); err != nil { + return + } + + if _, err = w.Write(b[:length]); err != nil { + return + } + + return +} + +/* +'A': []interface{} +'D': Decimal +'F': Table +'I': int32 +'S': string +'T': time.Time +'V': nil +'b': byte +'d': float64 +'f': float32 +'l': int64 +'s': int16 +'t': bool +'x': []byte +*/ +func writeField(w io.Writer, value interface{}) (err error) { + var buf [9]byte + var enc []byte + + switch v := value.(type) { + case bool: + buf[0] = 't' + if v { + buf[1] = byte(1) + } else { + buf[1] = byte(0) + } + enc = buf[:2] + + case byte: + buf[0] = 'b' + buf[1] = byte(v) + enc = buf[:2] + + case int16: + buf[0] = 's' + binary.BigEndian.PutUint16(buf[1:3], uint16(v)) + enc = buf[:3] + + case int: + buf[0] = 'I' + binary.BigEndian.PutUint32(buf[1:5], uint32(v)) + enc = buf[:5] + + case int32: + buf[0] = 'I' + binary.BigEndian.PutUint32(buf[1:5], uint32(v)) + enc = buf[:5] + + case int64: + buf[0] = 'l' + binary.BigEndian.PutUint64(buf[1:9], uint64(v)) + enc = buf[:9] + + case float32: + buf[0] = 'f' + binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v)) + enc = buf[:5] + + case float64: + buf[0] = 'd' + binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v)) + enc = buf[:9] + + case Decimal: + buf[0] = 'D' + buf[1] = byte(v.Scale) + binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value)) + enc = buf[:6] + + case string: + buf[0] = 'S' + binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) + enc = append(buf[:5], []byte(v)...) + + case []interface{}: // field-array + buf[0] = 'A' + + sec := new(bytes.Buffer) + for _, val := range v { + if err = writeField(sec, val); err != nil { + return + } + } + + binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len())) + if _, err = w.Write(buf[:5]); err != nil { + return + } + + if _, err = w.Write(sec.Bytes()); err != nil { + return + } + + return + + case time.Time: + buf[0] = 'T' + binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix())) + enc = buf[:9] + + case Table: + if _, err = w.Write([]byte{'F'}); err != nil { + return + } + return writeTable(w, v) + + case []byte: + buf[0] = 'x' + binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) + if _, err = w.Write(buf[0:5]); err != nil { + return + } + if _, err = w.Write(v); err != nil { + return + } + return + + case nil: + buf[0] = 'V' + enc = buf[:1] + + default: + return ErrFieldType + } + + _, err = w.Write(enc) + + return +} + +func writeTable(w io.Writer, table Table) (err error) { + var buf bytes.Buffer + + for key, val := range table { + if err = writeShortstr(&buf, key); err != nil { + return + } + if err = writeField(&buf, val); err != nil { + return + } + } + + return writeLongstr(w, string(buf.Bytes())) +}