mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-22 05:28:25 +00:00
qemu/qmp: re-implement mainLoop
In newer versions of QEMU, like 4.0-rc2, QMP events can be thrown even before the QMP-version response, one example of this behaviour is when a virtio serial is closed and a VSERPORT_CHANGE event is thrown. Re-implement mainLoop to check the data received from the VM channel, since it's not a guarantee that the first data read from the VM channel is the QMP version. fixes https://github.com/kata-containers/runtime/issues/1474 Signed-off-by: Julio Montes <julio.montes@intel.com>
This commit is contained in:
parent
78d079db6d
commit
694a7b1c61
55
qemu/qmp.go
55
qemu/qmp.go
@ -459,7 +459,6 @@ func (q *QMP) parseVersion(version []byte) *QMPVersion {
|
||||
for _, k := range []string{"QMP", "version", "qemu"} {
|
||||
versionMap, _ = versionMap[k].(map[string]interface{})
|
||||
if versionMap == nil {
|
||||
q.cfg.Logger.Errorf("Invalid QMP greeting: %s", string(version))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -530,31 +529,9 @@ func (q *QMP) mainLoop() {
|
||||
close(q.disconnectedCh)
|
||||
}()
|
||||
|
||||
var version []byte
|
||||
var cmdDoneCh <-chan struct{}
|
||||
|
||||
DONE:
|
||||
for {
|
||||
var ok bool
|
||||
select {
|
||||
case cmd, ok := <-q.cmdCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
_ = cmdQueue.PushBack(&cmd)
|
||||
case version, ok = <-fromVMCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if cmdQueue.Len() >= 1 {
|
||||
q.writeNextQMPCommand(cmdQueue)
|
||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||
}
|
||||
break DONE
|
||||
}
|
||||
}
|
||||
|
||||
q.connectedCh <- q.parseVersion(version)
|
||||
var version *QMPVersion
|
||||
ready := false
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -564,21 +541,37 @@ DONE:
|
||||
}
|
||||
_ = cmdQueue.PushBack(&cmd)
|
||||
|
||||
// We only want to execute the new cmd if there
|
||||
// are no other commands pending. If there are
|
||||
// commands pending our new command will get
|
||||
// run when the pending commands complete.
|
||||
|
||||
if cmdQueue.Len() == 1 {
|
||||
// We only want to execute the new cmd if QMP is
|
||||
// ready and there are no other commands pending.
|
||||
// If there are commands pending our new command
|
||||
// will get run when the pending commands complete.
|
||||
if ready && cmdQueue.Len() == 1 {
|
||||
q.writeNextQMPCommand(cmdQueue)
|
||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||
}
|
||||
|
||||
case line, ok := <-fromVMCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !ready {
|
||||
// Not ready yet. Check if line is the QMP version.
|
||||
// Sometimes QMP events are thrown before the QMP version,
|
||||
// hence it's not a guarantee that the first data read from
|
||||
// the channel is the QMP version.
|
||||
version = q.parseVersion(line)
|
||||
if version != nil {
|
||||
q.connectedCh <- version
|
||||
ready = true
|
||||
}
|
||||
// Do not process QMP input to avoid deadlocks.
|
||||
break
|
||||
}
|
||||
|
||||
q.processQMPInput(line, cmdQueue)
|
||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||
|
||||
case <-cmdDoneCh:
|
||||
q.cancelCurrentCommand(cmdQueue)
|
||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||
|
@ -103,6 +103,19 @@ func newQMPTestCommandBuffer(t *testing.T) *qmpTestCommandBuffer {
|
||||
return b
|
||||
}
|
||||
|
||||
func newQMPTestCommandBufferNoGreeting(t *testing.T) *qmpTestCommandBuffer {
|
||||
b := &qmpTestCommandBuffer{
|
||||
newDataCh: make(chan []byte, 1),
|
||||
t: t,
|
||||
buf: bytes.NewBuffer([]byte{}),
|
||||
forceFail: make(chan struct{}),
|
||||
}
|
||||
b.cmds = make([]qmpTestCommand, 0, 8)
|
||||
b.events = make([]qmpTestEvent, 0, 8)
|
||||
b.results = make([]qmpTestResult, 0, 8)
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@ -1504,3 +1517,48 @@ func TestExecuteNVDIMMDeviceAdd(t *testing.T) {
|
||||
q.Shutdown()
|
||||
<-disconnectedCh
|
||||
}
|
||||
|
||||
func TestMainLoopEventBeforeGreeting(t *testing.T) {
|
||||
const (
|
||||
seconds = 1352167040730
|
||||
microseconds = 123456
|
||||
)
|
||||
|
||||
connectedCh := make(chan *QMPVersion)
|
||||
disconnectedCh := make(chan struct{})
|
||||
buf := newQMPTestCommandBufferNoGreeting(t)
|
||||
|
||||
// Add events
|
||||
var wg sync.WaitGroup
|
||||
buf.AddEvent("VSERPORT_CHANGE", time.Millisecond*100,
|
||||
map[string]interface{}{
|
||||
"open": false,
|
||||
"id": "channel0",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"seconds": seconds,
|
||||
"microseconds": microseconds,
|
||||
})
|
||||
buf.AddEvent("POWERDOWN", time.Millisecond*200, nil,
|
||||
map[string]interface{}{
|
||||
"seconds": seconds,
|
||||
"microseconds": microseconds,
|
||||
})
|
||||
|
||||
// register a channel to receive events
|
||||
eventCh := make(chan QMPEvent)
|
||||
cfg := QMPConfig{EventCh: eventCh, Logger: qmpTestLogger{}}
|
||||
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
|
||||
|
||||
// Start events, this will lead to a deadlock if mainLoop is not implemented
|
||||
// correctly
|
||||
buf.startEventLoop(&wg)
|
||||
wg.Wait()
|
||||
|
||||
// Send greeting and check version
|
||||
buf.newDataCh <- []byte(qmpHello)
|
||||
checkVersion(t, connectedCh)
|
||||
|
||||
q.Shutdown()
|
||||
<-disconnectedCh
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user