diff --git a/qemu/qmp.go b/qemu/qmp.go index 9b21d96480..f35aa3c31e 100644 --- a/qemu/qmp.go +++ b/qemu/qmp.go @@ -459,7 +459,6 @@ func (q *QMP) parseVersion(version []byte) *QMPVersion { for _, k := range []string{"QMP", "version", "qemu"} { versionMap, _ = versionMap[k].(map[string]interface{}) if versionMap == nil { - q.cfg.Logger.Errorf("Invalid QMP greeting: %s", string(version)) return nil } } @@ -530,31 +529,9 @@ func (q *QMP) mainLoop() { close(q.disconnectedCh) }() - var version []byte var cmdDoneCh <-chan struct{} - -DONE: - for { - var ok bool - select { - case cmd, ok := <-q.cmdCh: - if !ok { - return - } - _ = cmdQueue.PushBack(&cmd) - case version, ok = <-fromVMCh: - if !ok { - return - } - if cmdQueue.Len() >= 1 { - q.writeNextQMPCommand(cmdQueue) - cmdDoneCh = currentCommandDoneCh(cmdQueue) - } - break DONE - } - } - - q.connectedCh <- q.parseVersion(version) + var version *QMPVersion + ready := false for { select { @@ -564,21 +541,37 @@ DONE: } _ = cmdQueue.PushBack(&cmd) - // We only want to execute the new cmd if there - // are no other commands pending. If there are - // commands pending our new command will get - // run when the pending commands complete. - - if cmdQueue.Len() == 1 { + // We only want to execute the new cmd if QMP is + // ready and there are no other commands pending. + // If there are commands pending our new command + // will get run when the pending commands complete. + if ready && cmdQueue.Len() == 1 { q.writeNextQMPCommand(cmdQueue) cmdDoneCh = currentCommandDoneCh(cmdQueue) } + case line, ok := <-fromVMCh: if !ok { return } + + if !ready { + // Not ready yet. Check if line is the QMP version. + // Sometimes QMP events are thrown before the QMP version, + // hence it's not a guarantee that the first data read from + // the channel is the QMP version. + version = q.parseVersion(line) + if version != nil { + q.connectedCh <- version + ready = true + } + // Do not process QMP input to avoid deadlocks. + break + } + q.processQMPInput(line, cmdQueue) cmdDoneCh = currentCommandDoneCh(cmdQueue) + case <-cmdDoneCh: q.cancelCurrentCommand(cmdQueue) cmdDoneCh = currentCommandDoneCh(cmdQueue) diff --git a/qemu/qmp_test.go b/qemu/qmp_test.go index b12c798a17..61710669f8 100644 --- a/qemu/qmp_test.go +++ b/qemu/qmp_test.go @@ -103,6 +103,19 @@ func newQMPTestCommandBuffer(t *testing.T) *qmpTestCommandBuffer { return b } +func newQMPTestCommandBufferNoGreeting(t *testing.T) *qmpTestCommandBuffer { + b := &qmpTestCommandBuffer{ + newDataCh: make(chan []byte, 1), + t: t, + buf: bytes.NewBuffer([]byte{}), + forceFail: make(chan struct{}), + } + b.cmds = make([]qmpTestCommand, 0, 8) + b.events = make([]qmpTestEvent, 0, 8) + b.results = make([]qmpTestResult, 0, 8) + return b +} + func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) { wg.Add(1) go func() { @@ -1504,3 +1517,48 @@ func TestExecuteNVDIMMDeviceAdd(t *testing.T) { q.Shutdown() <-disconnectedCh } + +func TestMainLoopEventBeforeGreeting(t *testing.T) { + const ( + seconds = 1352167040730 + microseconds = 123456 + ) + + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBufferNoGreeting(t) + + // Add events + var wg sync.WaitGroup + buf.AddEvent("VSERPORT_CHANGE", time.Millisecond*100, + map[string]interface{}{ + "open": false, + "id": "channel0", + }, + map[string]interface{}{ + "seconds": seconds, + "microseconds": microseconds, + }) + buf.AddEvent("POWERDOWN", time.Millisecond*200, nil, + map[string]interface{}{ + "seconds": seconds, + "microseconds": microseconds, + }) + + // register a channel to receive events + eventCh := make(chan QMPEvent) + cfg := QMPConfig{EventCh: eventCh, Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + + // Start events, this will lead to a deadlock if mainLoop is not implemented + // correctly + buf.startEventLoop(&wg) + wg.Wait() + + // Send greeting and check version + buf.newDataCh <- []byte(qmpHello) + checkVersion(t, connectedCh) + + q.Shutdown() + <-disconnectedCh +}