some work on the replay subsystem, plus getting better grasp of the FBS format

This commit is contained in:
amit bezalel 2017-07-14 00:52:50 +03:00
parent 7dc1773596
commit 852f08db21
12 changed files with 691 additions and 586 deletions

File diff suppressed because it is too large Load Diff

21
.vscode/launch.json vendored
View File

@ -70,25 +70,16 @@
"showLog": true "showLog": true
}, },
{ {
"name": "Launch fbs server", "name": "Launch fbs player",
"type": "go", "type": "go",
"request": "launch", "request": "launch",
"mode": "debug", "mode": "test",
"remotePath": "", "remotePath": "",
"port": 2345, "port": 2345,
"host": "127.0.0.1", "program": "${workspaceRoot}/player",
"program": "${workspaceRoot}/server/", "args": [
"osx": { "-test.v"
"env": { ],
//"GOPATH": "${env.HOME}/Dropbox/go"
}
},
"windows": {
"env": {
//"GOPATH": "${env.USERPROFILE}\\Dropbox\\go"
}
},
"args": [],
"showLog": true "showLog": true
} }
] ]

View File

@ -495,6 +495,9 @@ func (c *ClientConn) mainLoop() {
defer func() { defer func() {
logger.Warn("ClientConn.MainLoop: exiting!") logger.Warn("ClientConn.MainLoop: exiting!")
c.Listeners.Consume(&common.RfbSegment{
SegmentType: common.SegmentConnectionClosed,
})
}() }()
for { for {

View File

@ -15,11 +15,12 @@ const (
SegmentFullyParsedClientMessage SegmentFullyParsedClientMessage
SegmentFullyParsedServerMessage SegmentFullyParsedServerMessage
SegmentServerInitMessage SegmentServerInitMessage
SegmentConnectionClosed
) )
type SegmentType int type SegmentType int
func (seg SegmentType ) String() string { func (seg SegmentType) String() string {
switch seg { switch seg {
case SegmentBytes: case SegmentBytes:
return "SegmentBytes" return "SegmentBytes"

12
main.go
View File

@ -74,15 +74,20 @@ func main() {
// GreenShift: 8, // GreenShift: 8,
// BlueShift: 0, // BlueShift: 0,
// }) // })
start := getNowMillisec()
go func() { go func() {
for { for {
if getNowMillisec()-start >= 10000 {
break
}
err = clientConn.FramebufferUpdateRequest(true, 0, 0, 1280, 800) err = clientConn.FramebufferUpdateRequest(true, 0, 0, 1280, 800)
if err != nil { if err != nil {
logger.Errorf("error requesting fb update: %s", err) logger.Errorf("error requesting fb update: %s", err)
} }
time.Sleep(500 * time.Millisecond) time.Sleep(250 * time.Millisecond)
} }
clientConn.Close()
}() }()
//go func() { //go func() {
@ -93,3 +98,6 @@ func main() {
//clientConn.Close() //clientConn.Close()
} }
func getNowMillisec() int {
return int(time.Now().UnixNano() / int64(time.Millisecond))
}

View File

@ -1,4 +1,4 @@
package server package player
import ( import (
"encoding/binary" "encoding/binary"
@ -6,10 +6,26 @@ import (
"os" "os"
"vncproxy/common" "vncproxy/common"
"vncproxy/logger" "vncproxy/logger"
"bytes"
) )
type FbsReader struct { type FbsReader struct {
reader io.Reader reader io.Reader
buffer bytes.Buffer
currentTimestamp uint32
}
func (fbs *FbsReader) Read(p []byte) (n int, err error) {
if fbs.buffer.Len() < len(p) {
seg, err := fbs.ReadSegment()
if err != nil {
logger.Error("FBSReader.Read: error reading FBSsegment: ",err)
return 0, err
}
fbs.buffer.Write(seg.bytes)
fbs.currentTimestamp = seg.timestamp
}
return fbs.buffer.Read(p)
} }
func NewFbsReader(fbsFile string) (*FbsReader, error) { func NewFbsReader(fbsFile string) (*FbsReader, error) {
@ -22,10 +38,10 @@ func NewFbsReader(fbsFile string) (*FbsReader, error) {
return &FbsReader{reader: reader}, nil return &FbsReader{reader: reader}, nil
} }
func (player *FbsReader) ReadStartSession() (*common.ServerInit, error) { func (fbs *FbsReader) ReadStartSession() (*common.ServerInit, error) {
initMsg := common.ServerInit{} initMsg := common.ServerInit{}
reader := player.reader reader := fbs.reader
var framebufferWidth uint16 var framebufferWidth uint16
var framebufferHeight uint16 var framebufferHeight uint16
@ -42,27 +58,27 @@ func (player *FbsReader) ReadStartSession() (*common.ServerInit, error) {
//read the version message into the buffer so it will be written in the first rbs block //read the version message into the buffer so it will be written in the first rbs block
//RFB 003.008\n //RFB 003.008\n
bytes = make([]byte, 12) bytes = make([]byte, 12)
_, err = reader.Read(bytes) _, err = fbs.Read(bytes)
if err != nil { if err != nil {
logger.Error("error reading rbs init - RFB Version: ", err) logger.Error("error reading rbs init - RFB Version: ", err)
return nil, err return nil, err
} }
//push sec type and fb dimensions //push sec type and fb dimensions
binary.Read(reader, binary.BigEndian, &SecTypeNone) binary.Read(fbs, binary.BigEndian, &SecTypeNone)
if err != nil { if err != nil {
logger.Error("error reading rbs init - SecType: ", err) logger.Error("error reading rbs init - SecType: ", err)
} }
//read frame buffer width, height //read frame buffer width, height
binary.Read(reader, binary.BigEndian, &framebufferWidth) binary.Read(fbs, binary.BigEndian, &framebufferWidth)
if err != nil { if err != nil {
logger.Error("error reading rbs init - FBWidth: ", err) logger.Error("error reading rbs init - FBWidth: ", err)
return nil, err return nil, err
} }
initMsg.FBWidth = framebufferWidth initMsg.FBWidth = framebufferWidth
binary.Read(reader, binary.BigEndian, &framebufferHeight) binary.Read(fbs, binary.BigEndian, &framebufferHeight)
if err != nil { if err != nil {
logger.Error("error reading rbs init - FBHeight: ", err) logger.Error("error reading rbs init - FBHeight: ", err)
return nil, err return nil, err
@ -71,7 +87,7 @@ func (player *FbsReader) ReadStartSession() (*common.ServerInit, error) {
//read pixel format //read pixel format
pixelFormat := &common.PixelFormat{} pixelFormat := &common.PixelFormat{}
binary.Read(reader, binary.BigEndian, pixelFormat) binary.Read(fbs, binary.BigEndian, pixelFormat)
if err != nil { if err != nil {
logger.Error("error reading rbs init - Pixelformat: ", err) logger.Error("error reading rbs init - Pixelformat: ", err)
return nil, err return nil, err
@ -79,11 +95,11 @@ func (player *FbsReader) ReadStartSession() (*common.ServerInit, error) {
initMsg.PixelFormat = *pixelFormat initMsg.PixelFormat = *pixelFormat
//read padding //read padding
bytes = make([]byte, 3) bytes = make([]byte, 3)
reader.Read(bytes) fbs.Read(bytes)
//read desktop name //read desktop name
var desknameLen uint32 var desknameLen uint32
binary.Read(reader, binary.BigEndian, &desknameLen) binary.Read(fbs, binary.BigEndian, &desknameLen)
if err != nil { if err != nil {
logger.Error("error reading rbs init - deskname Len: ", err) logger.Error("error reading rbs init - deskname Len: ", err)
return nil, err return nil, err
@ -91,7 +107,7 @@ func (player *FbsReader) ReadStartSession() (*common.ServerInit, error) {
initMsg.NameLength = desknameLen initMsg.NameLength = desknameLen
bytes = make([]byte, desknameLen) bytes = make([]byte, desknameLen)
reader.Read(bytes) fbs.Read(bytes)
if err != nil { if err != nil {
logger.Error("error reading rbs init - desktopName: ", err) logger.Error("error reading rbs init - desktopName: ", err)
return nil, err return nil, err
@ -102,8 +118,8 @@ func (player *FbsReader) ReadStartSession() (*common.ServerInit, error) {
return &initMsg, nil return &initMsg, nil
} }
func (player *FbsReader) ReadSegment() (*FbsSegment, error) { func (fbs *FbsReader) ReadSegment() (*FbsSegment, error) {
reader := player.reader reader := fbs.reader
var bytesLen uint32 var bytesLen uint32
//read length //read length
@ -135,10 +151,11 @@ func (player *FbsReader) ReadSegment() (*FbsSegment, error) {
} }
//timeStamp := time.Unix(timeSinceStart, 0) //timeStamp := time.Unix(timeSinceStart, 0)
return &FbsSegment{actualBytes, timeSinceStart}, nil seg := &FbsSegment{bytes: actualBytes, timestamp: timeSinceStart}
return seg, nil
} }
type FbsSegment struct { type FbsSegment struct {
bytes []byte bytes []byte
timeSinceStart uint32 timestamp uint32
} }

137
player/player_test.go Normal file
View File

@ -0,0 +1,137 @@
package player
import (
"log"
"testing"
"time"
"vncproxy/common"
"vncproxy/encodings"
"vncproxy/logger"
"vncproxy/server"
"encoding/binary"
)
type ServerMessageHandler struct {
Conn *server.ServerConn
Fbs *FbsReader
firstSegDone bool
startTime int
}
func (handler *ServerMessageHandler) Consume(seg *common.RfbSegment) error {
switch seg.SegmentType {
case common.SegmentFullyParsedClientMessage:
clientMsg := seg.Message.(common.ClientMessage)
logger.Debugf("ClientUpdater.Consume:(vnc-server-bound) got ClientMessage type=%s", clientMsg.Type())
switch clientMsg.Type() {
case common.FramebufferUpdateRequestMsgType:
if !handler.firstSegDone {
handler.firstSegDone = true
handler.startTime = int(time.Now().UnixNano() / int64(time.Millisecond))
}
handler.sendFbsMessage()
}
// server.FramebufferUpdateRequest:
}
return nil
}
func (h *ServerMessageHandler) sendFbsMessage() {
var messageType uint8
fbs := h.Fbs
//conn := h.Conn
binary.Read(fbs,binary.BigEndian,&messageType)
bytes := messages[messageType].Read(fbs)
h.Conn.Write(bytes)
//seg, err := fbs.ReadSegment()
//
//now := int(time.Now().UnixNano() / int64(time.Millisecond))
//if err != nil {
// logger.Error("TestServer.NewConnHandler: Error in reading FBS segment: ", err)
// return
//}
//timeSinceStart := now - h.startTime
//
//timeToWait := timeSinceStart - int(seg.timestamp)
//
//if timeToWait > 0 {
// time.Sleep(time.Duration(timeToWait) * time.Millisecond)
//}
//fmt.Printf("bytes: %v", seg.bytes)
//conn.Write(seg.bytes)
}
func loadFbsFile(filename string, conn *server.ServerConn) (*FbsReader, error) {
fbs, err := NewFbsReader(filename)
if err != nil {
logger.Error("failed to open fbs reader:", err)
return nil, err
}
//NewFbsReader("/Users/amitbet/vncRec/recording.rbs")
initMsg, err := fbs.ReadStartSession()
if err != nil {
logger.Error("failed to open read fbs start session:", err)
return nil, err
}
conn.SetPixelFormat(&initMsg.PixelFormat)
conn.SetHeight(initMsg.FBHeight)
conn.SetWidth(initMsg.FBWidth)
conn.SetDesktopName(string(initMsg.NameText))
return fbs, nil
}
func TestServer(t *testing.T) {
//chServer := make(chan common.ClientMessage)
chClient := make(chan common.ServerMessage)
cfg := &server.ServerConfig{
//SecurityHandlers: []SecurityHandler{&ServerAuthNone{}, &ServerAuthVNC{}},
SecurityHandlers: []server.SecurityHandler{&server.ServerAuthNone{}},
Encodings: []common.Encoding{&encodings.RawEncoding{}, &encodings.TightEncoding{}, &encodings.CopyRectEncoding{}},
PixelFormat: common.NewPixelFormat(32),
//ClientMessageCh: chServer,
//ServerMessageCh: chClient,
ClientMessages: server.DefaultClientMessages,
DesktopName: []byte("workDesk"),
Height: uint16(768),
Width: uint16(1024),
//NewConnHandler: serverNewConnHandler,
}
cfg.NewConnHandler = func(cfg *server.ServerConfig, conn *server.ServerConn) error {
fbs, err := loadFbsFile("/Users/amitbet/Dropbox/recording.rbs", conn)
if err != nil {
logger.Error("TestServer.NewConnHandler: Error in loading FBS: ", err)
return err
}
conn.Listeners.AddListener(&ServerMessageHandler{conn, fbs, false, 0})
return nil
}
url := "http://localhost:7777/"
go server.WsServe(url, cfg)
go server.TcpServe(":5904", cfg)
// fbs, err := loadFbsFile("/Users/amitbet/vncRec/recording.rbs", cfg)
// if err != nil {
// logger.Error("TestServer.NewConnHandler: Error in loading FBS: ", err)
// return
// }
// Process messages coming in on the ClientMessage channel.
for {
msg := <-chClient
switch msg.Type() {
default:
log.Printf("Received message type:%v msg:%v\n", msg.Type(), msg)
}
}
}

View File

@ -145,7 +145,7 @@ func (vp *VncProxy) StartListening() {
SecurityHandlers: secHandlers, SecurityHandlers: secHandlers,
Encodings: []common.Encoding{&encodings.RawEncoding{}, &encodings.TightEncoding{}, &encodings.CopyRectEncoding{}}, Encodings: []common.Encoding{&encodings.RawEncoding{}, &encodings.TightEncoding{}, &encodings.CopyRectEncoding{}},
PixelFormat: common.NewPixelFormat(32), PixelFormat: common.NewPixelFormat(32),
ServerMessageCh: chClient, //ServerMessageCh: chClient,
ClientMessages: server.DefaultClientMessages, ClientMessages: server.DefaultClientMessages,
DesktopName: []byte("workDesk"), DesktopName: []byte("workDesk"),
Height: uint16(768), Height: uint16(768),

View File

@ -1,63 +0,0 @@
package server
import (
"log"
"vncproxy/common"
"vncproxy/encodings"
"vncproxy/logger"
)
func newServerConnHandler(cfg *ServerConfig, conn *ServerConn) error {
return nil
}
func main() {
//chServer := make(chan common.ClientMessage)
chClient := make(chan common.ServerMessage)
cfg := &ServerConfig{
//SecurityHandlers: []SecurityHandler{&ServerAuthNone{}, &ServerAuthVNC{}},
SecurityHandlers: []SecurityHandler{&ServerAuthVNC{"Ch_#!T@8"}},
Encodings: []common.Encoding{&encodings.RawEncoding{}, &encodings.TightEncoding{}, &encodings.CopyRectEncoding{}},
PixelFormat: common.NewPixelFormat(32),
//ClientMessageCh: chServer,
ServerMessageCh: chClient,
ClientMessages: DefaultClientMessages,
DesktopName: []byte("workDesk"),
Height: uint16(768),
Width: uint16(1024),
NewConnHandler: newServerConnHandler,
}
loadFbsFile("c:\\Users\\betzalel\\Dropbox\\recording.rbs", cfg)
url := "http://localhost:8091/"
go WsServe(url, cfg)
go TcpServe(":5904", cfg)
// Process messages coming in on the ClientMessage channel.
for {
msg := <-chClient
switch msg.Type() {
default:
log.Printf("Received message type:%v msg:%v\n", msg.Type(), msg)
}
}
}
func loadFbsFile(filename string, cfg *ServerConfig) {
fbs, err := NewFbsReader(filename)
if err != nil {
logger.Error("failed to open fbs reader:", err)
}
//NewFbsReader("/Users/amitbet/vncRec/recording.rbs")
initMsg, err := fbs.ReadStartSession()
if err != nil {
logger.Error("failed to open read fbs start session:", err)
}
cfg.PixelFormat = &initMsg.PixelFormat
cfg.Height = initMsg.FBHeight
cfg.Width = initMsg.FBWidth
cfg.DesktopName = initMsg.NameText
}

View File

@ -171,7 +171,11 @@ func (c *ServerConn) handle() error {
//var wg sync.WaitGroup //var wg sync.WaitGroup
//defer c.Close() //defer c.Close()
defer func() {
c.Listeners.Consume(&common.RfbSegment{
SegmentType: common.SegmentConnectionClosed,
})
}()
//create a map of all message types //create a map of all message types
clientMessages := make(map[common.ClientMessageType]common.ClientMessage) clientMessages := make(map[common.ClientMessageType]common.ClientMessage)
for _, m := range c.cfg.ClientMessages { for _, m := range c.cfg.ClientMessages {
@ -180,21 +184,21 @@ func (c *ServerConn) handle() error {
//wg.Add(2) //wg.Add(2)
// server // server
go func() error { // go func() error {
//defer wg.Done() // //defer wg.Done()
for { // for {
select { // select {
case msg := <-c.cfg.ServerMessageCh: // case msg := <-c.cfg.ServerMessageCh:
logger.Debugf("%v", msg) // logger.Debugf("%v", msg)
// if err = msg.Write(c); err != nil { // // if err = msg.Write(c); err != nil {
// return err // // return err
// // }
// case <-c.quit:
// c.Close()
// return nil
// } // }
case <-c.quit: // }
c.Close() // }()
return nil
}
}
}()
// client // client
//go func() error { //go func() error {

View File

@ -55,7 +55,7 @@ type ServerConfig struct {
PixelFormat *common.PixelFormat PixelFormat *common.PixelFormat
ColorMap *common.ColorMap ColorMap *common.ColorMap
//ClientMessageCh chan common.ClientMessage //ClientMessageCh chan common.ClientMessage
ServerMessageCh chan common.ServerMessage //ServerMessageCh chan common.ServerMessage
ClientMessages []common.ClientMessage ClientMessages []common.ClientMessage
DesktopName []byte DesktopName []byte
Height uint16 Height uint16

View File

@ -20,6 +20,7 @@ type Recorder struct {
serverInitMessage *common.ServerInit serverInitMessage *common.ServerInit
sessionStartWritten bool sessionStartWritten bool
segmentChan chan *common.RfbSegment segmentChan chan *common.RfbSegment
maxWriteSize int
} }
func getNowMillisec() int { func getNowMillisec() int {
@ -35,6 +36,8 @@ func NewRecorder(saveFilePath string) *Recorder {
rec := Recorder{RBSFileName: saveFilePath, startTime: getNowMillisec()} rec := Recorder{RBSFileName: saveFilePath, startTime: getNowMillisec()}
var err error var err error
rec.maxWriteSize = 65536
rec.writer, err = os.OpenFile(saveFilePath, os.O_RDWR|os.O_CREATE, 0755) rec.writer, err = os.OpenFile(saveFilePath, os.O_RDWR|os.O_CREATE, 0755)
if err != nil { if err != nil {
logger.Errorf("unable to open file: %s, error: %v", saveFilePath, err) logger.Errorf("unable to open file: %s, error: %v", saveFilePath, err)
@ -78,9 +81,7 @@ func (r *Recorder) writeStartSession(initMsg *common.ServerInit) error {
framebufferHeight := initMsg.FBHeight framebufferHeight := initMsg.FBHeight
//write rfb header information (the only part done without the [size|data|timestamp] block wrapper) //write rfb header information (the only part done without the [size|data|timestamp] block wrapper)
r.buffer.WriteString("FBS 001.000\n") r.writer.WriteString("FBS 001.000\n")
r.buffer.WriteTo(r.writer)
r.buffer.Reset()
//push the version message into the buffer so it will be written in the first rbs block //push the version message into the buffer so it will be written in the first rbs block
r.buffer.WriteString(versionMsg_3_3) r.buffer.WriteString(versionMsg_3_3)
@ -144,11 +145,15 @@ func (r *Recorder) HandleRfbSegment(data *common.RfbSegment) error {
default: default:
logger.Warn("Recorder.HandleRfbSegment: unknown message type:" + string(data.UpcomingObjectType)) logger.Warn("Recorder.HandleRfbSegment: unknown message type:" + string(data.UpcomingObjectType))
} }
case common.SegmentConnectionClosed:
r.writeToDisk()
case common.SegmentRectSeparator: case common.SegmentRectSeparator:
logger.Debugf("Recorder.HandleRfbSegment: writing rect") logger.Debugf("Recorder.HandleRfbSegment: writing rect")
r.writeToDisk() //r.writeToDisk()
case common.SegmentBytes: case common.SegmentBytes:
if r.buffer.Len()+len(data.Bytes) > r.maxWriteSize-4 {
r.writeToDisk()
}
_, err := r.buffer.Write(data.Bytes) _, err := r.buffer.Write(data.Bytes)
return err return err
case common.SegmentServerInitMessage: case common.SegmentServerInitMessage: