mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-22 21:49:41 +00:00
qemu/qmp: re-implement mainLoop
In newer versions of QEMU, like 4.0-rc2, QMP events can be thrown even before the QMP-version response, one example of this behaviour is when a virtio serial is closed and a VSERPORT_CHANGE event is thrown. Re-implement mainLoop to check the data received from the VM channel, since it's not a guarantee that the first data read from the VM channel is the QMP version. fixes https://github.com/kata-containers/runtime/issues/1474 Signed-off-by: Julio Montes <julio.montes@intel.com>
This commit is contained in:
parent
78d079db6d
commit
694a7b1c61
55
qemu/qmp.go
55
qemu/qmp.go
@ -459,7 +459,6 @@ func (q *QMP) parseVersion(version []byte) *QMPVersion {
|
|||||||
for _, k := range []string{"QMP", "version", "qemu"} {
|
for _, k := range []string{"QMP", "version", "qemu"} {
|
||||||
versionMap, _ = versionMap[k].(map[string]interface{})
|
versionMap, _ = versionMap[k].(map[string]interface{})
|
||||||
if versionMap == nil {
|
if versionMap == nil {
|
||||||
q.cfg.Logger.Errorf("Invalid QMP greeting: %s", string(version))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -530,31 +529,9 @@ func (q *QMP) mainLoop() {
|
|||||||
close(q.disconnectedCh)
|
close(q.disconnectedCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var version []byte
|
|
||||||
var cmdDoneCh <-chan struct{}
|
var cmdDoneCh <-chan struct{}
|
||||||
|
var version *QMPVersion
|
||||||
DONE:
|
ready := false
|
||||||
for {
|
|
||||||
var ok bool
|
|
||||||
select {
|
|
||||||
case cmd, ok := <-q.cmdCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = cmdQueue.PushBack(&cmd)
|
|
||||||
case version, ok = <-fromVMCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if cmdQueue.Len() >= 1 {
|
|
||||||
q.writeNextQMPCommand(cmdQueue)
|
|
||||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
|
||||||
}
|
|
||||||
break DONE
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
q.connectedCh <- q.parseVersion(version)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -564,21 +541,37 @@ DONE:
|
|||||||
}
|
}
|
||||||
_ = cmdQueue.PushBack(&cmd)
|
_ = cmdQueue.PushBack(&cmd)
|
||||||
|
|
||||||
// We only want to execute the new cmd if there
|
// We only want to execute the new cmd if QMP is
|
||||||
// are no other commands pending. If there are
|
// ready and there are no other commands pending.
|
||||||
// commands pending our new command will get
|
// If there are commands pending our new command
|
||||||
// run when the pending commands complete.
|
// will get run when the pending commands complete.
|
||||||
|
if ready && cmdQueue.Len() == 1 {
|
||||||
if cmdQueue.Len() == 1 {
|
|
||||||
q.writeNextQMPCommand(cmdQueue)
|
q.writeNextQMPCommand(cmdQueue)
|
||||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||||
}
|
}
|
||||||
|
|
||||||
case line, ok := <-fromVMCh:
|
case line, ok := <-fromVMCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !ready {
|
||||||
|
// Not ready yet. Check if line is the QMP version.
|
||||||
|
// Sometimes QMP events are thrown before the QMP version,
|
||||||
|
// hence it's not a guarantee that the first data read from
|
||||||
|
// the channel is the QMP version.
|
||||||
|
version = q.parseVersion(line)
|
||||||
|
if version != nil {
|
||||||
|
q.connectedCh <- version
|
||||||
|
ready = true
|
||||||
|
}
|
||||||
|
// Do not process QMP input to avoid deadlocks.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
q.processQMPInput(line, cmdQueue)
|
q.processQMPInput(line, cmdQueue)
|
||||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||||
|
|
||||||
case <-cmdDoneCh:
|
case <-cmdDoneCh:
|
||||||
q.cancelCurrentCommand(cmdQueue)
|
q.cancelCurrentCommand(cmdQueue)
|
||||||
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
cmdDoneCh = currentCommandDoneCh(cmdQueue)
|
||||||
|
@ -103,6 +103,19 @@ func newQMPTestCommandBuffer(t *testing.T) *qmpTestCommandBuffer {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newQMPTestCommandBufferNoGreeting(t *testing.T) *qmpTestCommandBuffer {
|
||||||
|
b := &qmpTestCommandBuffer{
|
||||||
|
newDataCh: make(chan []byte, 1),
|
||||||
|
t: t,
|
||||||
|
buf: bytes.NewBuffer([]byte{}),
|
||||||
|
forceFail: make(chan struct{}),
|
||||||
|
}
|
||||||
|
b.cmds = make([]qmpTestCommand, 0, 8)
|
||||||
|
b.events = make([]qmpTestEvent, 0, 8)
|
||||||
|
b.results = make([]qmpTestResult, 0, 8)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) {
|
func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -1504,3 +1517,48 @@ func TestExecuteNVDIMMDeviceAdd(t *testing.T) {
|
|||||||
q.Shutdown()
|
q.Shutdown()
|
||||||
<-disconnectedCh
|
<-disconnectedCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMainLoopEventBeforeGreeting(t *testing.T) {
|
||||||
|
const (
|
||||||
|
seconds = 1352167040730
|
||||||
|
microseconds = 123456
|
||||||
|
)
|
||||||
|
|
||||||
|
connectedCh := make(chan *QMPVersion)
|
||||||
|
disconnectedCh := make(chan struct{})
|
||||||
|
buf := newQMPTestCommandBufferNoGreeting(t)
|
||||||
|
|
||||||
|
// Add events
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
buf.AddEvent("VSERPORT_CHANGE", time.Millisecond*100,
|
||||||
|
map[string]interface{}{
|
||||||
|
"open": false,
|
||||||
|
"id": "channel0",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"seconds": seconds,
|
||||||
|
"microseconds": microseconds,
|
||||||
|
})
|
||||||
|
buf.AddEvent("POWERDOWN", time.Millisecond*200, nil,
|
||||||
|
map[string]interface{}{
|
||||||
|
"seconds": seconds,
|
||||||
|
"microseconds": microseconds,
|
||||||
|
})
|
||||||
|
|
||||||
|
// register a channel to receive events
|
||||||
|
eventCh := make(chan QMPEvent)
|
||||||
|
cfg := QMPConfig{EventCh: eventCh, Logger: qmpTestLogger{}}
|
||||||
|
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
|
||||||
|
|
||||||
|
// Start events, this will lead to a deadlock if mainLoop is not implemented
|
||||||
|
// correctly
|
||||||
|
buf.startEventLoop(&wg)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Send greeting and check version
|
||||||
|
buf.newDataCh <- []byte(qmpHello)
|
||||||
|
checkVersion(t, connectedCh)
|
||||||
|
|
||||||
|
q.Shutdown()
|
||||||
|
<-disconnectedCh
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user