mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-23 20:49:26 +00:00
Bring in the files related AMQP into amqp
directory
This commit is contained in:
parent
254068ec39
commit
5b3f63dd28
456
tap/extensions/amqp/read.go
Normal file
456
tap/extensions/amqp/read.go
Normal file
@ -0,0 +1,456 @@
|
|||||||
|
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
// Source code and contact info at http://github.com/streadway/amqp
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
Reads a frame from an input stream and returns an interface that can be cast into
|
||||||
|
one of the following:
|
||||||
|
|
||||||
|
MethodFrame
|
||||||
|
PropertiesFrame
|
||||||
|
BodyFrame
|
||||||
|
HeartbeatFrame
|
||||||
|
|
||||||
|
2.3.5 frame Details
|
||||||
|
|
||||||
|
All frames consist of a header (7 octets), a payload of arbitrary size, and a
|
||||||
|
'frame-end' octet that detects malformed frames:
|
||||||
|
|
||||||
|
0 1 3 7 size+7 size+8
|
||||||
|
+------+---------+-------------+ +------------+ +-----------+
|
||||||
|
| type | channel | size | | payload | | frame-end |
|
||||||
|
+------+---------+-------------+ +------------+ +-----------+
|
||||||
|
octet short long size octets octet
|
||||||
|
|
||||||
|
To read a frame, we:
|
||||||
|
1. Read the header and check the frame type and channel.
|
||||||
|
2. Depending on the frame type, we read the payload and process it.
|
||||||
|
3. Read the frame end octet.
|
||||||
|
|
||||||
|
In realistic implementations where performance is a concern, we would use
|
||||||
|
“read-ahead buffering” or
|
||||||
|
|
||||||
|
“gathering reads” to avoid doing three separate system calls to read a frame.
|
||||||
|
*/
|
||||||
|
func (r *AmqpReader) ReadFrame() (frame frame, err error) {
|
||||||
|
var scratch [7]byte
|
||||||
|
|
||||||
|
if _, err = io.ReadFull(r.R, scratch[:7]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
typ := uint8(scratch[0])
|
||||||
|
channel := binary.BigEndian.Uint16(scratch[1:3])
|
||||||
|
size := binary.BigEndian.Uint32(scratch[3:7])
|
||||||
|
|
||||||
|
switch typ {
|
||||||
|
case frameMethod:
|
||||||
|
if frame, err = r.parseMethodFrame(channel, size); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case frameHeader:
|
||||||
|
if frame, err = r.parseHeaderFrame(channel, size); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case frameBody:
|
||||||
|
if frame, err = r.parseBodyFrame(channel, size); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
case frameHeartbeat:
|
||||||
|
if frame, err = r.parseHeartbeatFrame(channel, size); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, ErrFrame
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.ReadFull(r.R, scratch[:1]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if scratch[0] != frameEnd {
|
||||||
|
return nil, ErrFrame
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func readShortstr(r io.Reader) (v string, err error) {
|
||||||
|
var length uint8
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &length); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes := make([]byte, length)
|
||||||
|
if _, err = io.ReadFull(r, bytes); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return string(bytes), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readLongstr(r io.Reader) (v string, err error) {
|
||||||
|
var length uint32
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &length); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// slices can't be longer than max int32 value
|
||||||
|
if length > (^uint32(0) >> 1) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes := make([]byte, length)
|
||||||
|
if _, err = io.ReadFull(r, bytes); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return string(bytes), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDecimal(r io.Reader) (v Decimal, err error) {
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func readFloat32(r io.Reader) (v float32, err error) {
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &v); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func readFloat64(r io.Reader) (v float64, err error) {
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &v); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func readTimestamp(r io.Reader) (v time.Time, err error) {
|
||||||
|
var sec int64
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &sec); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return time.Unix(sec, 0), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
'A': []interface{}
|
||||||
|
'D': Decimal
|
||||||
|
'F': Table
|
||||||
|
'I': int32
|
||||||
|
'S': string
|
||||||
|
'T': time.Time
|
||||||
|
'V': nil
|
||||||
|
'b': byte
|
||||||
|
'd': float64
|
||||||
|
'f': float32
|
||||||
|
'l': int64
|
||||||
|
's': int16
|
||||||
|
't': bool
|
||||||
|
'x': []byte
|
||||||
|
*/
|
||||||
|
func readField(r io.Reader) (v interface{}, err error) {
|
||||||
|
var typ byte
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &typ); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch typ {
|
||||||
|
case 't':
|
||||||
|
var value uint8
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return (value != 0), nil
|
||||||
|
|
||||||
|
case 'b':
|
||||||
|
var value [1]byte
|
||||||
|
if _, err = io.ReadFull(r, value[0:1]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return value[0], nil
|
||||||
|
|
||||||
|
case 's':
|
||||||
|
var value int16
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
|
||||||
|
case 'I':
|
||||||
|
var value int32
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
|
||||||
|
case 'l':
|
||||||
|
var value int64
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
|
||||||
|
case 'f':
|
||||||
|
var value float32
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
|
||||||
|
case 'd':
|
||||||
|
var value float64
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &value); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
|
||||||
|
case 'D':
|
||||||
|
return readDecimal(r)
|
||||||
|
|
||||||
|
case 'S':
|
||||||
|
return readLongstr(r)
|
||||||
|
|
||||||
|
case 'A':
|
||||||
|
return readArray(r)
|
||||||
|
|
||||||
|
case 'T':
|
||||||
|
return readTimestamp(r)
|
||||||
|
|
||||||
|
case 'F':
|
||||||
|
return readTable(r)
|
||||||
|
|
||||||
|
case 'x':
|
||||||
|
var len int32
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &len); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
value := make([]byte, len)
|
||||||
|
if _, err = io.ReadFull(r, value); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return value, err
|
||||||
|
|
||||||
|
case 'V':
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, ErrSyntax
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Field tables are long strings that contain packed name-value pairs. The
|
||||||
|
name-value pairs are encoded as short string defining the name, and octet
|
||||||
|
defining the values type and then the value itself. The valid field types for
|
||||||
|
tables are an extension of the native integer, bit, string, and timestamp
|
||||||
|
types, and are shown in the grammar. Multi-octet integer fields are always
|
||||||
|
held in network byte order.
|
||||||
|
*/
|
||||||
|
func readTable(r io.Reader) (table Table, err error) {
|
||||||
|
var nested bytes.Buffer
|
||||||
|
var str string
|
||||||
|
|
||||||
|
if str, err = readLongstr(r); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nested.Write([]byte(str))
|
||||||
|
|
||||||
|
table = make(Table)
|
||||||
|
|
||||||
|
for nested.Len() > 0 {
|
||||||
|
var key string
|
||||||
|
var value interface{}
|
||||||
|
|
||||||
|
if key, err = readShortstr(&nested); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, err = readField(&nested); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
table[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func readArray(r io.Reader) ([]interface{}, error) {
|
||||||
|
var (
|
||||||
|
size uint32
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if err = binary.Read(r, binary.BigEndian, &size); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
lim = &io.LimitedReader{R: r, N: int64(size)}
|
||||||
|
arr = []interface{}{}
|
||||||
|
field interface{}
|
||||||
|
)
|
||||||
|
|
||||||
|
for {
|
||||||
|
if field, err = readField(lim); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
arr = append(arr, field)
|
||||||
|
}
|
||||||
|
|
||||||
|
return arr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checks if this bit mask matches the flags bitset
|
||||||
|
func hasProperty(mask uint16, prop int) bool {
|
||||||
|
return int(mask)&prop > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) {
|
||||||
|
hf := &HeaderFrame{
|
||||||
|
ChannelId: channel,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Read(r.R, binary.BigEndian, &hf.ClassId); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Read(r.R, binary.BigEndian, &hf.weight); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Read(r.R, binary.BigEndian, &hf.Size); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var flags uint16
|
||||||
|
|
||||||
|
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasProperty(flags, flagContentType) {
|
||||||
|
if hf.Properties.ContentType, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagContentEncoding) {
|
||||||
|
if hf.Properties.ContentEncoding, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagHeaders) {
|
||||||
|
if hf.Properties.Headers, err = readTable(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagDeliveryMode) {
|
||||||
|
if err = binary.Read(r.R, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagPriority) {
|
||||||
|
if err = binary.Read(r.R, binary.BigEndian, &hf.Properties.Priority); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagCorrelationId) {
|
||||||
|
if hf.Properties.CorrelationId, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagReplyTo) {
|
||||||
|
if hf.Properties.ReplyTo, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagExpiration) {
|
||||||
|
if hf.Properties.Expiration, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagMessageId) {
|
||||||
|
if hf.Properties.MessageId, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagTimestamp) {
|
||||||
|
if hf.Properties.Timestamp, err = readTimestamp(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagType) {
|
||||||
|
if hf.Properties.Type, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagUserId) {
|
||||||
|
if hf.Properties.UserId, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagAppId) {
|
||||||
|
if hf.Properties.AppId, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(flags, flagReserved1) {
|
||||||
|
if hf.Properties.reserved1, err = readShortstr(r.R); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return hf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) {
|
||||||
|
bf := &BodyFrame{
|
||||||
|
ChannelId: channel,
|
||||||
|
Body: make([]byte, size),
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var errHeartbeatPayload = errors.New("Heartbeats should not have a payload")
|
||||||
|
|
||||||
|
func (r *AmqpReader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) {
|
||||||
|
hf := &HeartbeatFrame{
|
||||||
|
ChannelId: channel,
|
||||||
|
}
|
||||||
|
|
||||||
|
if size > 0 {
|
||||||
|
return nil, errHeartbeatPayload
|
||||||
|
}
|
||||||
|
|
||||||
|
return hf, nil
|
||||||
|
}
|
3306
tap/extensions/amqp/spec091.go
Normal file
3306
tap/extensions/amqp/spec091.go
Normal file
File diff suppressed because it is too large
Load Diff
428
tap/extensions/amqp/types.go
Normal file
428
tap/extensions/amqp/types.go
Normal file
@ -0,0 +1,428 @@
|
|||||||
|
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
// Source code and contact info at http://github.com/streadway/amqp
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Constants for standard AMQP 0-9-1 exchange types.
|
||||||
|
const (
|
||||||
|
ExchangeDirect = "direct"
|
||||||
|
ExchangeFanout = "fanout"
|
||||||
|
ExchangeTopic = "topic"
|
||||||
|
ExchangeHeaders = "headers"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrClosed is returned when the channel or connection is not open
|
||||||
|
ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
|
||||||
|
|
||||||
|
// ErrChannelMax is returned when Connection.Channel has been called enough
|
||||||
|
// times that all channel IDs have been exhausted in the client or the
|
||||||
|
// server.
|
||||||
|
ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}
|
||||||
|
|
||||||
|
// ErrSASL is returned from Dial when the authentication mechanism could not
|
||||||
|
// be negoated.
|
||||||
|
ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}
|
||||||
|
|
||||||
|
// ErrCredentials is returned when the authenticated client is not authorized
|
||||||
|
// to any vhost.
|
||||||
|
ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}
|
||||||
|
|
||||||
|
// ErrVhost is returned when the authenticated user is not permitted to
|
||||||
|
// access the requested Vhost.
|
||||||
|
ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}
|
||||||
|
|
||||||
|
// ErrSyntax is hard protocol error, indicating an unsupported protocol,
|
||||||
|
// implementation or encoding.
|
||||||
|
ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}
|
||||||
|
|
||||||
|
// ErrFrame is returned when the protocol frame cannot be read from the
|
||||||
|
// server, indicating an unsupported protocol or unsupported frame type.
|
||||||
|
ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}
|
||||||
|
|
||||||
|
// ErrCommandInvalid is returned when the server sends an unexpected response
|
||||||
|
// to this requested message type. This indicates a bug in this client.
|
||||||
|
ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}
|
||||||
|
|
||||||
|
// ErrUnexpectedFrame is returned when something other than a method or
|
||||||
|
// heartbeat frame is delivered to the Connection, indicating a bug in the
|
||||||
|
// client.
|
||||||
|
ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}
|
||||||
|
|
||||||
|
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
|
||||||
|
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Error captures the code and reason a channel or connection has been closed
|
||||||
|
// by the server.
|
||||||
|
type Error struct {
|
||||||
|
Code int // constant code from the specification
|
||||||
|
Reason string // description of the error
|
||||||
|
Server bool // true when initiated from the server, false when from this library
|
||||||
|
Recover bool // true when this error can be recovered by retrying later or with different parameters
|
||||||
|
}
|
||||||
|
|
||||||
|
func newError(code uint16, text string) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: int(code),
|
||||||
|
Reason: text,
|
||||||
|
Recover: isSoftExceptionCode(int(code)),
|
||||||
|
Server: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Error) Error() string {
|
||||||
|
return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used by header frames to capture routing and header information
|
||||||
|
type Properties struct {
|
||||||
|
ContentType string // MIME content type
|
||||||
|
ContentEncoding string // MIME content encoding
|
||||||
|
Headers Table // Application or header exchange table
|
||||||
|
DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2)
|
||||||
|
Priority uint8 // queue implementation use - 0 to 9
|
||||||
|
CorrelationId string // application use - correlation identifier
|
||||||
|
ReplyTo string // application use - address to to reply to (ex: RPC)
|
||||||
|
Expiration string // implementation use - message expiration spec
|
||||||
|
MessageId string // application use - message identifier
|
||||||
|
Timestamp time.Time // application use - message timestamp
|
||||||
|
Type string // application use - message type name
|
||||||
|
UserId string // application use - creating user id
|
||||||
|
AppId string // application use - creating application
|
||||||
|
reserved1 string // was cluster-id - process for buffer consumption
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeliveryMode. Transient means higher throughput but messages will not be
|
||||||
|
// restored on broker restart. The delivery mode of publishings is unrelated
|
||||||
|
// to the durability of the queues they reside on. Transient messages will
|
||||||
|
// not be restored to durable queues, persistent messages will be restored to
|
||||||
|
// durable queues and lost on non-durable queues during server restart.
|
||||||
|
//
|
||||||
|
// This remains typed as uint8 to match Publishing.DeliveryMode. Other
|
||||||
|
// delivery modes specific to custom queue implementations are not enumerated
|
||||||
|
// here.
|
||||||
|
const (
|
||||||
|
Transient uint8 = 1
|
||||||
|
Persistent uint8 = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
// The property flags are an array of bits that indicate the presence or
|
||||||
|
// absence of each property value in sequence. The bits are ordered from most
|
||||||
|
// high to low - bit 15 indicates the first property.
|
||||||
|
const (
|
||||||
|
flagContentType = 0x8000
|
||||||
|
flagContentEncoding = 0x4000
|
||||||
|
flagHeaders = 0x2000
|
||||||
|
flagDeliveryMode = 0x1000
|
||||||
|
flagPriority = 0x0800
|
||||||
|
flagCorrelationId = 0x0400
|
||||||
|
flagReplyTo = 0x0200
|
||||||
|
flagExpiration = 0x0100
|
||||||
|
flagMessageId = 0x0080
|
||||||
|
flagTimestamp = 0x0040
|
||||||
|
flagType = 0x0020
|
||||||
|
flagUserId = 0x0010
|
||||||
|
flagAppId = 0x0008
|
||||||
|
flagReserved1 = 0x0004
|
||||||
|
)
|
||||||
|
|
||||||
|
// Queue captures the current server state of the queue on the server returned
|
||||||
|
// from Channel.QueueDeclare or Channel.QueueInspect.
|
||||||
|
type Queue struct {
|
||||||
|
Name string // server confirmed or generated name
|
||||||
|
Messages int // count of messages not awaiting acknowledgment
|
||||||
|
Consumers int // number of consumers receiving deliveries
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publishing captures the client message sent to the server. The fields
|
||||||
|
// outside of the Headers table included in this struct mirror the underlying
|
||||||
|
// fields in the content frame. They use native types for convenience and
|
||||||
|
// efficiency.
|
||||||
|
type Publishing struct {
|
||||||
|
// Application or exchange specific fields,
|
||||||
|
// the headers exchange will inspect this field.
|
||||||
|
Headers Table
|
||||||
|
|
||||||
|
// Properties
|
||||||
|
ContentType string // MIME content type
|
||||||
|
ContentEncoding string // MIME content encoding
|
||||||
|
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
|
||||||
|
Priority uint8 // 0 to 9
|
||||||
|
CorrelationId string // correlation identifier
|
||||||
|
ReplyTo string // address to to reply to (ex: RPC)
|
||||||
|
Expiration string // message expiration spec
|
||||||
|
MessageId string // message identifier
|
||||||
|
Timestamp time.Time // message timestamp
|
||||||
|
Type string // message type name
|
||||||
|
UserId string // creating user id - ex: "guest"
|
||||||
|
AppId string // creating application id
|
||||||
|
|
||||||
|
// The application specific payload of the message
|
||||||
|
Body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blocking notifies the server's TCP flow control of the Connection. When a
|
||||||
|
// server hits a memory or disk alarm it will block all connections until the
|
||||||
|
// resources are reclaimed. Use NotifyBlock on the Connection to receive these
|
||||||
|
// events.
|
||||||
|
type Blocking struct {
|
||||||
|
Active bool // TCP pushback active/inactive on server
|
||||||
|
Reason string // Server reason for activation
|
||||||
|
}
|
||||||
|
|
||||||
|
// Confirmation notifies the acknowledgment or negative acknowledgement of a
|
||||||
|
// publishing identified by its delivery tag. Use NotifyPublish on the Channel
|
||||||
|
// to consume these events.
|
||||||
|
type Confirmation struct {
|
||||||
|
DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
|
||||||
|
Ack bool // True when the server successfully received the publishing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decimal matches the AMQP decimal type. Scale is the number of decimal
|
||||||
|
// digits Scale == 2, Value == 12345, Decimal == 123.45
|
||||||
|
type Decimal struct {
|
||||||
|
Scale uint8
|
||||||
|
Value int32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Table stores user supplied fields of the following types:
|
||||||
|
//
|
||||||
|
// bool
|
||||||
|
// byte
|
||||||
|
// float32
|
||||||
|
// float64
|
||||||
|
// int
|
||||||
|
// int16
|
||||||
|
// int32
|
||||||
|
// int64
|
||||||
|
// nil
|
||||||
|
// string
|
||||||
|
// time.Time
|
||||||
|
// amqp.Decimal
|
||||||
|
// amqp.Table
|
||||||
|
// []byte
|
||||||
|
// []interface{} - containing above types
|
||||||
|
//
|
||||||
|
// Functions taking a table will immediately fail when the table contains a
|
||||||
|
// value of an unsupported type.
|
||||||
|
//
|
||||||
|
// The caller must be specific in which precision of integer it wishes to
|
||||||
|
// encode.
|
||||||
|
//
|
||||||
|
// Use a type assertion when reading values from a table for type conversion.
|
||||||
|
//
|
||||||
|
// RabbitMQ expects int32 for integer values.
|
||||||
|
//
|
||||||
|
type Table map[string]interface{}
|
||||||
|
|
||||||
|
func validateField(f interface{}) error {
|
||||||
|
switch fv := f.(type) {
|
||||||
|
case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
|
||||||
|
return nil
|
||||||
|
|
||||||
|
case []interface{}:
|
||||||
|
for _, v := range fv {
|
||||||
|
if err := validateField(v); err != nil {
|
||||||
|
return fmt.Errorf("in array %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
case Table:
|
||||||
|
for k, v := range fv {
|
||||||
|
if err := validateField(v); err != nil {
|
||||||
|
return fmt.Errorf("table field %q %s", k, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("value %T not supported", f)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate returns and error if any Go types in the table are incompatible with AMQP types.
|
||||||
|
func (t Table) Validate() error {
|
||||||
|
return validateField(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heap interface for maintaining delivery tags
|
||||||
|
type tagSet []uint64
|
||||||
|
|
||||||
|
func (set tagSet) Len() int { return len(set) }
|
||||||
|
func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] }
|
||||||
|
func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] }
|
||||||
|
func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
|
||||||
|
func (set *tagSet) Pop() interface{} {
|
||||||
|
val := (*set)[len(*set)-1]
|
||||||
|
*set = (*set)[:len(*set)-1]
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message interface {
|
||||||
|
id() (uint16, uint16)
|
||||||
|
wait() bool
|
||||||
|
read(io.Reader) error
|
||||||
|
write(io.Writer) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type messageWithContent interface {
|
||||||
|
Message
|
||||||
|
getContent() (Properties, []byte)
|
||||||
|
setContent(Properties, []byte)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
The base interface implemented as:
|
||||||
|
|
||||||
|
2.3.5 frame Details
|
||||||
|
|
||||||
|
All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects
|
||||||
|
malformed frames:
|
||||||
|
|
||||||
|
0 1 3 7 size+7 size+8
|
||||||
|
+------+---------+-------------+ +------------+ +-----------+
|
||||||
|
| type | channel | size | | payload | | frame-end |
|
||||||
|
+------+---------+-------------+ +------------+ +-----------+
|
||||||
|
octet short long size octets octet
|
||||||
|
|
||||||
|
To read a frame, we:
|
||||||
|
|
||||||
|
1. Read the header and check the frame type and channel.
|
||||||
|
2. Depending on the frame type, we read the payload and process it.
|
||||||
|
3. Read the frame end octet.
|
||||||
|
|
||||||
|
In realistic implementations where performance is a concern, we would use
|
||||||
|
“read-ahead buffering” or “gathering reads” to avoid doing three separate
|
||||||
|
system calls to read a frame.
|
||||||
|
|
||||||
|
*/
|
||||||
|
type frame interface {
|
||||||
|
write(io.Writer) error
|
||||||
|
channel() uint16
|
||||||
|
}
|
||||||
|
|
||||||
|
type AmqpReader struct {
|
||||||
|
R io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
type writer struct {
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements the frame interface for Connection RPC
|
||||||
|
type protocolHeader struct{}
|
||||||
|
|
||||||
|
func (protocolHeader) write(w io.Writer) error {
|
||||||
|
_, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (protocolHeader) channel() uint16 {
|
||||||
|
panic("only valid as initial handshake")
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Method frames carry the high-level protocol commands (which we call "methods").
|
||||||
|
One method frame carries one command. The method frame payload has this format:
|
||||||
|
|
||||||
|
0 2 4
|
||||||
|
+----------+-----------+-------------- - -
|
||||||
|
| class-id | method-id | arguments...
|
||||||
|
+----------+-----------+-------------- - -
|
||||||
|
short short ...
|
||||||
|
|
||||||
|
To process a method frame, we:
|
||||||
|
1. Read the method frame payload.
|
||||||
|
2. Unpack it into a structure. A given method always has the same structure,
|
||||||
|
so we can unpack the method rapidly. 3. Check that the method is allowed in
|
||||||
|
the current context.
|
||||||
|
4. Check that the method arguments are valid.
|
||||||
|
5. Execute the method.
|
||||||
|
|
||||||
|
Method frame bodies are constructed as a list of AMQP data fields (bits,
|
||||||
|
integers, strings and string tables). The marshalling code is trivially
|
||||||
|
generated directly from the protocol specifications, and can be very rapid.
|
||||||
|
*/
|
||||||
|
type MethodFrame struct {
|
||||||
|
ChannelId uint16
|
||||||
|
ClassId uint16
|
||||||
|
MethodId uint16
|
||||||
|
Method Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *MethodFrame) channel() uint16 { return f.ChannelId }
|
||||||
|
|
||||||
|
/*
|
||||||
|
Heartbeating is a technique designed to undo one of TCP/IP's features, namely
|
||||||
|
its ability to recover from a broken physical connection by closing only after
|
||||||
|
a quite long time-out. In some scenarios we need to know very rapidly if a
|
||||||
|
peer is disconnected or not responding for other reasons (e.g. it is looping).
|
||||||
|
Since heartbeating can be done at a low level, we implement this as a special
|
||||||
|
type of frame that peers exchange at the transport level, rather than as a
|
||||||
|
class method.
|
||||||
|
*/
|
||||||
|
type HeartbeatFrame struct {
|
||||||
|
ChannelId uint16
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *HeartbeatFrame) channel() uint16 { return f.ChannelId }
|
||||||
|
|
||||||
|
/*
|
||||||
|
Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally
|
||||||
|
defined as carrying content. When a peer sends such a method frame, it always
|
||||||
|
follows it with a content header and zero or more content body frames.
|
||||||
|
|
||||||
|
A content header frame has this format:
|
||||||
|
|
||||||
|
0 2 4 12 14
|
||||||
|
+----------+--------+-----------+----------------+------------- - -
|
||||||
|
| class-id | weight | body size | property flags | property list...
|
||||||
|
+----------+--------+-----------+----------------+------------- - -
|
||||||
|
short short long long short remainder...
|
||||||
|
|
||||||
|
We place content body in distinct frames (rather than including it in the
|
||||||
|
method) so that AMQP may support "zero copy" techniques in which content is
|
||||||
|
never marshalled or encoded. We place the content properties in their own
|
||||||
|
frame so that recipients can selectively discard contents they do not want to
|
||||||
|
process
|
||||||
|
*/
|
||||||
|
type HeaderFrame struct {
|
||||||
|
ChannelId uint16
|
||||||
|
ClassId uint16
|
||||||
|
weight uint16
|
||||||
|
Size uint64
|
||||||
|
Properties Properties
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *HeaderFrame) channel() uint16 { return f.ChannelId }
|
||||||
|
|
||||||
|
/*
|
||||||
|
Content is the application data we carry from client-to-client via the AMQP
|
||||||
|
server. Content is, roughly speaking, a set of properties plus a binary data
|
||||||
|
part. The set of allowed properties are defined by the Basic class, and these
|
||||||
|
form the "content header frame". The data can be any size, and MAY be broken
|
||||||
|
into several (or many) chunks, each forming a "content body frame".
|
||||||
|
|
||||||
|
Looking at the frames for a specific channel, as they pass on the wire, we
|
||||||
|
might see something like this:
|
||||||
|
|
||||||
|
[method]
|
||||||
|
[method] [header] [body] [body]
|
||||||
|
[method]
|
||||||
|
...
|
||||||
|
*/
|
||||||
|
type BodyFrame struct {
|
||||||
|
ChannelId uint16
|
||||||
|
Body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BodyFrame) channel() uint16 { return f.ChannelId }
|
416
tap/extensions/amqp/write.go
Normal file
416
tap/extensions/amqp/write.go
Normal file
@ -0,0 +1,416 @@
|
|||||||
|
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
// Source code and contact info at http://github.com/streadway/amqp
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *writer) WriteFrame(frame frame) (err error) {
|
||||||
|
if err = frame.write(w.w); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if buf, ok := w.w.(*bufio.Writer); ok {
|
||||||
|
err = buf.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *MethodFrame) write(w io.Writer) (err error) {
|
||||||
|
var payload bytes.Buffer
|
||||||
|
|
||||||
|
if f.Method == nil {
|
||||||
|
return errors.New("malformed frame: missing method")
|
||||||
|
}
|
||||||
|
|
||||||
|
class, method := f.Method.id()
|
||||||
|
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, class); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, method); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = f.Method.write(&payload); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heartbeat
|
||||||
|
//
|
||||||
|
// Payload is empty
|
||||||
|
func (f *HeartbeatFrame) write(w io.Writer) (err error) {
|
||||||
|
return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// CONTENT HEADER
|
||||||
|
// 0 2 4 12 14
|
||||||
|
// +----------+--------+-----------+----------------+------------- - -
|
||||||
|
// | class-id | weight | body size | property flags | property list...
|
||||||
|
// +----------+--------+-----------+----------------+------------- - -
|
||||||
|
// short short long long short remainder...
|
||||||
|
//
|
||||||
|
func (f *HeaderFrame) write(w io.Writer) (err error) {
|
||||||
|
var payload bytes.Buffer
|
||||||
|
var zeroTime time.Time
|
||||||
|
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// First pass will build the mask to be serialized, second pass will serialize
|
||||||
|
// each of the fields that appear in the mask.
|
||||||
|
|
||||||
|
var mask uint16
|
||||||
|
|
||||||
|
if len(f.Properties.ContentType) > 0 {
|
||||||
|
mask = mask | flagContentType
|
||||||
|
}
|
||||||
|
if len(f.Properties.ContentEncoding) > 0 {
|
||||||
|
mask = mask | flagContentEncoding
|
||||||
|
}
|
||||||
|
if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 {
|
||||||
|
mask = mask | flagHeaders
|
||||||
|
}
|
||||||
|
if f.Properties.DeliveryMode > 0 {
|
||||||
|
mask = mask | flagDeliveryMode
|
||||||
|
}
|
||||||
|
if f.Properties.Priority > 0 {
|
||||||
|
mask = mask | flagPriority
|
||||||
|
}
|
||||||
|
if len(f.Properties.CorrelationId) > 0 {
|
||||||
|
mask = mask | flagCorrelationId
|
||||||
|
}
|
||||||
|
if len(f.Properties.ReplyTo) > 0 {
|
||||||
|
mask = mask | flagReplyTo
|
||||||
|
}
|
||||||
|
if len(f.Properties.Expiration) > 0 {
|
||||||
|
mask = mask | flagExpiration
|
||||||
|
}
|
||||||
|
if len(f.Properties.MessageId) > 0 {
|
||||||
|
mask = mask | flagMessageId
|
||||||
|
}
|
||||||
|
if f.Properties.Timestamp != zeroTime {
|
||||||
|
mask = mask | flagTimestamp
|
||||||
|
}
|
||||||
|
if len(f.Properties.Type) > 0 {
|
||||||
|
mask = mask | flagType
|
||||||
|
}
|
||||||
|
if len(f.Properties.UserId) > 0 {
|
||||||
|
mask = mask | flagUserId
|
||||||
|
}
|
||||||
|
if len(f.Properties.AppId) > 0 {
|
||||||
|
mask = mask | flagAppId
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, mask); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasProperty(mask, flagContentType) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.ContentType); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagContentEncoding) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagHeaders) {
|
||||||
|
if err = writeTable(&payload, f.Properties.Headers); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagDeliveryMode) {
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagPriority) {
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagCorrelationId) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagReplyTo) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagExpiration) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.Expiration); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagMessageId) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.MessageId); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagTimestamp) {
|
||||||
|
if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagType) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.Type); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagUserId) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.UserId); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasProperty(mask, flagAppId) {
|
||||||
|
if err = writeShortstr(&payload, f.Properties.AppId); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Body
|
||||||
|
//
|
||||||
|
// Payload is one byterange from the full body who's size is declared in the
|
||||||
|
// Header frame
|
||||||
|
func (f *BodyFrame) write(w io.Writer) (err error) {
|
||||||
|
return writeFrame(w, frameBody, f.ChannelId, f.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) {
|
||||||
|
end := []byte{frameEnd}
|
||||||
|
size := uint(len(payload))
|
||||||
|
|
||||||
|
_, err = w.Write([]byte{
|
||||||
|
byte(typ),
|
||||||
|
byte((channel & 0xff00) >> 8),
|
||||||
|
byte((channel & 0x00ff) >> 0),
|
||||||
|
byte((size & 0xff000000) >> 24),
|
||||||
|
byte((size & 0x00ff0000) >> 16),
|
||||||
|
byte((size & 0x0000ff00) >> 8),
|
||||||
|
byte((size & 0x000000ff) >> 0),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = w.Write(payload); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = w.Write(end); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeShortstr(w io.Writer, s string) (err error) {
|
||||||
|
b := []byte(s)
|
||||||
|
|
||||||
|
var length = uint8(len(b))
|
||||||
|
|
||||||
|
if err = binary.Write(w, binary.BigEndian, length); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = w.Write(b[:length]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeLongstr(w io.Writer, s string) (err error) {
|
||||||
|
b := []byte(s)
|
||||||
|
|
||||||
|
var length = uint32(len(b))
|
||||||
|
|
||||||
|
if err = binary.Write(w, binary.BigEndian, length); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = w.Write(b[:length]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
'A': []interface{}
|
||||||
|
'D': Decimal
|
||||||
|
'F': Table
|
||||||
|
'I': int32
|
||||||
|
'S': string
|
||||||
|
'T': time.Time
|
||||||
|
'V': nil
|
||||||
|
'b': byte
|
||||||
|
'd': float64
|
||||||
|
'f': float32
|
||||||
|
'l': int64
|
||||||
|
's': int16
|
||||||
|
't': bool
|
||||||
|
'x': []byte
|
||||||
|
*/
|
||||||
|
func writeField(w io.Writer, value interface{}) (err error) {
|
||||||
|
var buf [9]byte
|
||||||
|
var enc []byte
|
||||||
|
|
||||||
|
switch v := value.(type) {
|
||||||
|
case bool:
|
||||||
|
buf[0] = 't'
|
||||||
|
if v {
|
||||||
|
buf[1] = byte(1)
|
||||||
|
} else {
|
||||||
|
buf[1] = byte(0)
|
||||||
|
}
|
||||||
|
enc = buf[:2]
|
||||||
|
|
||||||
|
case byte:
|
||||||
|
buf[0] = 'b'
|
||||||
|
buf[1] = byte(v)
|
||||||
|
enc = buf[:2]
|
||||||
|
|
||||||
|
case int16:
|
||||||
|
buf[0] = 's'
|
||||||
|
binary.BigEndian.PutUint16(buf[1:3], uint16(v))
|
||||||
|
enc = buf[:3]
|
||||||
|
|
||||||
|
case int:
|
||||||
|
buf[0] = 'I'
|
||||||
|
binary.BigEndian.PutUint32(buf[1:5], uint32(v))
|
||||||
|
enc = buf[:5]
|
||||||
|
|
||||||
|
case int32:
|
||||||
|
buf[0] = 'I'
|
||||||
|
binary.BigEndian.PutUint32(buf[1:5], uint32(v))
|
||||||
|
enc = buf[:5]
|
||||||
|
|
||||||
|
case int64:
|
||||||
|
buf[0] = 'l'
|
||||||
|
binary.BigEndian.PutUint64(buf[1:9], uint64(v))
|
||||||
|
enc = buf[:9]
|
||||||
|
|
||||||
|
case float32:
|
||||||
|
buf[0] = 'f'
|
||||||
|
binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v))
|
||||||
|
enc = buf[:5]
|
||||||
|
|
||||||
|
case float64:
|
||||||
|
buf[0] = 'd'
|
||||||
|
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
|
||||||
|
enc = buf[:9]
|
||||||
|
|
||||||
|
case Decimal:
|
||||||
|
buf[0] = 'D'
|
||||||
|
buf[1] = byte(v.Scale)
|
||||||
|
binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value))
|
||||||
|
enc = buf[:6]
|
||||||
|
|
||||||
|
case string:
|
||||||
|
buf[0] = 'S'
|
||||||
|
binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
|
||||||
|
enc = append(buf[:5], []byte(v)...)
|
||||||
|
|
||||||
|
case []interface{}: // field-array
|
||||||
|
buf[0] = 'A'
|
||||||
|
|
||||||
|
sec := new(bytes.Buffer)
|
||||||
|
for _, val := range v {
|
||||||
|
if err = writeField(sec, val); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len()))
|
||||||
|
if _, err = w.Write(buf[:5]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = w.Write(sec.Bytes()); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
case time.Time:
|
||||||
|
buf[0] = 'T'
|
||||||
|
binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix()))
|
||||||
|
enc = buf[:9]
|
||||||
|
|
||||||
|
case Table:
|
||||||
|
if _, err = w.Write([]byte{'F'}); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return writeTable(w, v)
|
||||||
|
|
||||||
|
case []byte:
|
||||||
|
buf[0] = 'x'
|
||||||
|
binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
|
||||||
|
if _, err = w.Write(buf[0:5]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err = w.Write(v); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
|
case nil:
|
||||||
|
buf[0] = 'V'
|
||||||
|
enc = buf[:1]
|
||||||
|
|
||||||
|
default:
|
||||||
|
return ErrFieldType
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = w.Write(enc)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeTable(w io.Writer, table Table) (err error) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
|
||||||
|
for key, val := range table {
|
||||||
|
if err = writeShortstr(&buf, key); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = writeField(&buf, val); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeLongstr(w, string(buf.Bytes()))
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user