qemu/qmp: re-implement mainLoop

In newer versions of QEMU, like 4.0-rc2, QMP events can be thrown even before
the QMP-version response, one example of this behaviour is when a virtio serial
is closed and a VSERPORT_CHANGE event is thrown.
Re-implement mainLoop to check the data received from the VM channel, since
it's not a guarantee that the first data read from the VM channel is the
QMP version.

fixes https://github.com/kata-containers/runtime/issues/1474

Signed-off-by: Julio Montes <julio.montes@intel.com>
This commit is contained in:
Julio Montes 2019-04-04 10:35:24 -06:00
parent 78d079db6d
commit 694a7b1c61
2 changed files with 82 additions and 31 deletions

View File

@ -459,7 +459,6 @@ func (q *QMP) parseVersion(version []byte) *QMPVersion {
for _, k := range []string{"QMP", "version", "qemu"} { for _, k := range []string{"QMP", "version", "qemu"} {
versionMap, _ = versionMap[k].(map[string]interface{}) versionMap, _ = versionMap[k].(map[string]interface{})
if versionMap == nil { if versionMap == nil {
q.cfg.Logger.Errorf("Invalid QMP greeting: %s", string(version))
return nil return nil
} }
} }
@ -530,31 +529,9 @@ func (q *QMP) mainLoop() {
close(q.disconnectedCh) close(q.disconnectedCh)
}() }()
var version []byte
var cmdDoneCh <-chan struct{} var cmdDoneCh <-chan struct{}
var version *QMPVersion
DONE: ready := false
for {
var ok bool
select {
case cmd, ok := <-q.cmdCh:
if !ok {
return
}
_ = cmdQueue.PushBack(&cmd)
case version, ok = <-fromVMCh:
if !ok {
return
}
if cmdQueue.Len() >= 1 {
q.writeNextQMPCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
break DONE
}
}
q.connectedCh <- q.parseVersion(version)
for { for {
select { select {
@ -564,21 +541,37 @@ DONE:
} }
_ = cmdQueue.PushBack(&cmd) _ = cmdQueue.PushBack(&cmd)
// We only want to execute the new cmd if there // We only want to execute the new cmd if QMP is
// are no other commands pending. If there are // ready and there are no other commands pending.
// commands pending our new command will get // If there are commands pending our new command
// run when the pending commands complete. // will get run when the pending commands complete.
if ready && cmdQueue.Len() == 1 {
if cmdQueue.Len() == 1 {
q.writeNextQMPCommand(cmdQueue) q.writeNextQMPCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue) cmdDoneCh = currentCommandDoneCh(cmdQueue)
} }
case line, ok := <-fromVMCh: case line, ok := <-fromVMCh:
if !ok { if !ok {
return return
} }
if !ready {
// Not ready yet. Check if line is the QMP version.
// Sometimes QMP events are thrown before the QMP version,
// hence it's not a guarantee that the first data read from
// the channel is the QMP version.
version = q.parseVersion(line)
if version != nil {
q.connectedCh <- version
ready = true
}
// Do not process QMP input to avoid deadlocks.
break
}
q.processQMPInput(line, cmdQueue) q.processQMPInput(line, cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue) cmdDoneCh = currentCommandDoneCh(cmdQueue)
case <-cmdDoneCh: case <-cmdDoneCh:
q.cancelCurrentCommand(cmdQueue) q.cancelCurrentCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue) cmdDoneCh = currentCommandDoneCh(cmdQueue)

View File

@ -103,6 +103,19 @@ func newQMPTestCommandBuffer(t *testing.T) *qmpTestCommandBuffer {
return b return b
} }
func newQMPTestCommandBufferNoGreeting(t *testing.T) *qmpTestCommandBuffer {
b := &qmpTestCommandBuffer{
newDataCh: make(chan []byte, 1),
t: t,
buf: bytes.NewBuffer([]byte{}),
forceFail: make(chan struct{}),
}
b.cmds = make([]qmpTestCommand, 0, 8)
b.events = make([]qmpTestEvent, 0, 8)
b.results = make([]qmpTestResult, 0, 8)
return b
}
func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) { func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) {
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -1504,3 +1517,48 @@ func TestExecuteNVDIMMDeviceAdd(t *testing.T) {
q.Shutdown() q.Shutdown()
<-disconnectedCh <-disconnectedCh
} }
func TestMainLoopEventBeforeGreeting(t *testing.T) {
const (
seconds = 1352167040730
microseconds = 123456
)
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBufferNoGreeting(t)
// Add events
var wg sync.WaitGroup
buf.AddEvent("VSERPORT_CHANGE", time.Millisecond*100,
map[string]interface{}{
"open": false,
"id": "channel0",
},
map[string]interface{}{
"seconds": seconds,
"microseconds": microseconds,
})
buf.AddEvent("POWERDOWN", time.Millisecond*200, nil,
map[string]interface{}{
"seconds": seconds,
"microseconds": microseconds,
})
// register a channel to receive events
eventCh := make(chan QMPEvent)
cfg := QMPConfig{EventCh: eventCh, Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
// Start events, this will lead to a deadlock if mainLoop is not implemented
// correctly
buf.startEventLoop(&wg)
wg.Wait()
// Send greeting and check version
buf.newDataCh <- []byte(qmpHello)
checkVersion(t, connectedCh)
q.Shutdown()
<-disconnectedCh
}