diff --git a/qmp.go b/qmp.go index bc2596823b..c7ed0ec489 100644 --- a/qmp.go +++ b/qmp.go @@ -254,6 +254,15 @@ func (q *QMP) processQMPInput(line []byte, cmdQueue *list.List) { } } +func currentCommandDoneCh(cmdQueue *list.List) <-chan struct{} { + cmdEl := cmdQueue.Front() + if cmdEl == nil { + return nil + } + cmd := cmdEl.Value.(*qmpCommand) + return cmd.ctx.Done() +} + func (q *QMP) writeNextQMPCommand(cmdQueue *list.List) { cmdEl := cmdQueue.Front() cmd := cmdEl.Value.(*qmpCommand) @@ -293,6 +302,16 @@ func failOutstandingCommands(cmdQueue *list.List) { } } +func (q *QMP) cancelCurrentCommand(cmdQueue *list.List) { + cmdEl := cmdQueue.Front() + cmd := cmdEl.Value.(*qmpCommand) + if cmd.resultReceived { + q.finaliseCommand(cmdEl, cmdQueue, false) + } else { + cmd.filter = nil + } +} + func (q *QMP) parseVersion(version []byte) *QMPVersion { var qmp map[string]interface{} err := json.Unmarshal(version, &qmp) @@ -343,6 +362,7 @@ func (q *QMP) mainLoop() { }() version := []byte{} + var cmdDoneCh <-chan struct{} DONE: for { @@ -359,6 +379,7 @@ DONE: } if cmdQueue.Len() >= 1 { q.writeNextQMPCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } break DONE } @@ -373,14 +394,25 @@ DONE: return } _ = cmdQueue.PushBack(&cmd) - if cmdQueue.Len() >= 1 { + + // We only want to execute the new cmd if there + // are no other commands pending. If there are + // commands pending our new command will get + // run when the pending commands complete. + + if cmdQueue.Len() == 1 { q.writeNextQMPCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } case line, ok := <-fromVMCh: if !ok { return } q.processQMPInput(line, cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) + case <-cmdDoneCh: + q.cancelCurrentCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } } } diff --git a/qmp_test.go b/qmp_test.go index 4d60c0099b..a052c9055e 100644 --- a/qmp_test.go +++ b/qmp_test.go @@ -186,7 +186,9 @@ func (b *qmpTestCommandBuffer) Read(p []byte) (n int, err error) { func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) { var cmdJSON map[string]interface{} - if b.currentCmd >= len(b.cmds) { + currentCmd := b.currentCmd + b.currentCmd++ + if currentCmd >= len(b.cmds) { b.t.Fatalf("Unexpected command") } err := json.Unmarshal(p, &cmdJSON) @@ -195,14 +197,14 @@ func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) { } cmdName := cmdJSON["execute"] gotCmdName := cmdName.(string) - result := b.results[b.currentCmd].result - if gotCmdName != b.cmds[b.currentCmd].name { + result := b.results[currentCmd].result + if gotCmdName != b.cmds[currentCmd].name { b.t.Errorf("Unexpected command. Expected %s found %s", - b.cmds[b.currentCmd].name, gotCmdName) + b.cmds[currentCmd].name, gotCmdName) result = "error" } resultMap := make(map[string]interface{}) - resultMap[result] = b.results[b.currentCmd].data + resultMap[result] = b.results[currentCmd].data encodedRes, err := json.Marshal(&resultMap) if err != nil { b.t.Errorf("Unable to encode result: %v", err) @@ -582,6 +584,101 @@ func TestQMPSystemPowerdown(t *testing.T) { wg.Wait() } +// Checks that event commands can be cancelled. +// +// We start a QMPLoop, send the system_powerdown command. This command +// will time out after 1 second as the SHUTDOWN event never arrives. +// We then send a quit command to terminate the session. +// +// The system_powerdown command should be correctly sent but should block +// waiting for the SHUTDOWN event and should be successfully cancelled. +// The quit command should be successfully received and the QMP loop should +// exit gracefully. +func TestQMPEventedCommandCancel(t *testing.T) { + var wg sync.WaitGroup + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBuffer(t) + buf.AddCommmand("system_powerdown", nil, "return", nil) + buf.AddCommmand("quit", nil, "return", nil) + cfg := QMPConfig{Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + checkVersion(t, connectedCh) + buf.startEventLoop(&wg) + ctx, cancelFN := context.WithTimeout(context.Background(), time.Second) + err := q.ExecuteSystemPowerdown(ctx) + cancelFN() + if err == nil { + t.Fatalf("Expected SystemPowerdown to fail") + } + err = q.ExecuteQuit(context.Background()) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + q.Shutdown() + <-disconnectedCh + wg.Wait() +} + +// Checks that queued commands execute after an evented command is cancelled. +// +// This test is similar to the previous test with the exception that it +// tries to ensure that a second command is placed on the QMP structure's +// command queue before the evented command is cancelled. This allows us +// to test a slightly different use case. We start a QMPLoop, send the +// system_powerdown command. We do this by sending the command directly +// down the QMP.cmdCh rather than calling a higher level function as this +// allows us to ensure that we have another command queued before we +// timeout the first command. We then send a qmp_capabilities command and +// then we shutdown. +// +// The system_powerdown command should be correctly sent but should block +// waiting for the SHUTDOWN event and should be successfully cancelled. +// The query_capabilities command should be successfully received and the +// QMP loop should exit gracefully. +func TestQMPEventedCommandCancelConcurrent(t *testing.T) { + var wg sync.WaitGroup + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBuffer(t) + + buf.AddCommmand("system_powerdown", nil, "error", nil) + buf.AddCommmand("qmp_capabilities", nil, "return", nil) + + cfg := QMPConfig{Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + checkVersion(t, connectedCh) + buf.startEventLoop(&wg) + + resCh := make(chan qmpResult) + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + q.cmdCh <- qmpCommand{ + ctx: ctx, + res: resCh, + name: "system_powerdown", + filter: &qmpEventFilter{ + eventName: "SHUTDOWN", + }, + } + + var cmdWg sync.WaitGroup + cmdWg.Add(1) + go func() { + err := q.ExecuteQMPCapabilities(context.Background()) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + cmdWg.Done() + }() + + <-resCh + cancelFn() + cmdWg.Wait() + q.Shutdown() + <-disconnectedCh + wg.Wait() +} + // Checks that events can be received and parsed. // // Two events are provisioned and the QMPLoop is started with an valid eventCh.