From c6f334533ad05348ebfba942693c6e16329b32b8 Mon Sep 17 00:00:00 2001 From: Mark Ryan Date: Tue, 14 Feb 2017 18:25:59 +0000 Subject: [PATCH] qemu: Fix command cancelling. There was a bug with the cancelling of commands that meant that when an attempt was made to cancel a command and then to issue a second command, the first, cancelled command was re-issued. This commit fixes the issue and adds a new test case to check that cancelling of commands does indeed work. There was also an issue with the test harness which meant that tests that issued more than one command were not actually testing the second and third commands. Signed-off-by: Mark Ryan --- qmp.go | 34 ++++++++++++++++- qmp_test.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 135 insertions(+), 6 deletions(-) diff --git a/qmp.go b/qmp.go index bc2596823b..c7ed0ec489 100644 --- a/qmp.go +++ b/qmp.go @@ -254,6 +254,15 @@ func (q *QMP) processQMPInput(line []byte, cmdQueue *list.List) { } } +func currentCommandDoneCh(cmdQueue *list.List) <-chan struct{} { + cmdEl := cmdQueue.Front() + if cmdEl == nil { + return nil + } + cmd := cmdEl.Value.(*qmpCommand) + return cmd.ctx.Done() +} + func (q *QMP) writeNextQMPCommand(cmdQueue *list.List) { cmdEl := cmdQueue.Front() cmd := cmdEl.Value.(*qmpCommand) @@ -293,6 +302,16 @@ func failOutstandingCommands(cmdQueue *list.List) { } } +func (q *QMP) cancelCurrentCommand(cmdQueue *list.List) { + cmdEl := cmdQueue.Front() + cmd := cmdEl.Value.(*qmpCommand) + if cmd.resultReceived { + q.finaliseCommand(cmdEl, cmdQueue, false) + } else { + cmd.filter = nil + } +} + func (q *QMP) parseVersion(version []byte) *QMPVersion { var qmp map[string]interface{} err := json.Unmarshal(version, &qmp) @@ -343,6 +362,7 @@ func (q *QMP) mainLoop() { }() version := []byte{} + var cmdDoneCh <-chan struct{} DONE: for { @@ -359,6 +379,7 @@ DONE: } if cmdQueue.Len() >= 1 { q.writeNextQMPCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } break DONE } @@ -373,14 +394,25 @@ DONE: return } _ = cmdQueue.PushBack(&cmd) - if cmdQueue.Len() >= 1 { + + // We only want to execute the new cmd if there + // are no other commands pending. If there are + // commands pending our new command will get + // run when the pending commands complete. + + if cmdQueue.Len() == 1 { q.writeNextQMPCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } case line, ok := <-fromVMCh: if !ok { return } q.processQMPInput(line, cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) + case <-cmdDoneCh: + q.cancelCurrentCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } } } diff --git a/qmp_test.go b/qmp_test.go index 4d60c0099b..a052c9055e 100644 --- a/qmp_test.go +++ b/qmp_test.go @@ -186,7 +186,9 @@ func (b *qmpTestCommandBuffer) Read(p []byte) (n int, err error) { func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) { var cmdJSON map[string]interface{} - if b.currentCmd >= len(b.cmds) { + currentCmd := b.currentCmd + b.currentCmd++ + if currentCmd >= len(b.cmds) { b.t.Fatalf("Unexpected command") } err := json.Unmarshal(p, &cmdJSON) @@ -195,14 +197,14 @@ func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) { } cmdName := cmdJSON["execute"] gotCmdName := cmdName.(string) - result := b.results[b.currentCmd].result - if gotCmdName != b.cmds[b.currentCmd].name { + result := b.results[currentCmd].result + if gotCmdName != b.cmds[currentCmd].name { b.t.Errorf("Unexpected command. Expected %s found %s", - b.cmds[b.currentCmd].name, gotCmdName) + b.cmds[currentCmd].name, gotCmdName) result = "error" } resultMap := make(map[string]interface{}) - resultMap[result] = b.results[b.currentCmd].data + resultMap[result] = b.results[currentCmd].data encodedRes, err := json.Marshal(&resultMap) if err != nil { b.t.Errorf("Unable to encode result: %v", err) @@ -582,6 +584,101 @@ func TestQMPSystemPowerdown(t *testing.T) { wg.Wait() } +// Checks that event commands can be cancelled. +// +// We start a QMPLoop, send the system_powerdown command. This command +// will time out after 1 second as the SHUTDOWN event never arrives. +// We then send a quit command to terminate the session. +// +// The system_powerdown command should be correctly sent but should block +// waiting for the SHUTDOWN event and should be successfully cancelled. +// The quit command should be successfully received and the QMP loop should +// exit gracefully. +func TestQMPEventedCommandCancel(t *testing.T) { + var wg sync.WaitGroup + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBuffer(t) + buf.AddCommmand("system_powerdown", nil, "return", nil) + buf.AddCommmand("quit", nil, "return", nil) + cfg := QMPConfig{Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + checkVersion(t, connectedCh) + buf.startEventLoop(&wg) + ctx, cancelFN := context.WithTimeout(context.Background(), time.Second) + err := q.ExecuteSystemPowerdown(ctx) + cancelFN() + if err == nil { + t.Fatalf("Expected SystemPowerdown to fail") + } + err = q.ExecuteQuit(context.Background()) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + q.Shutdown() + <-disconnectedCh + wg.Wait() +} + +// Checks that queued commands execute after an evented command is cancelled. +// +// This test is similar to the previous test with the exception that it +// tries to ensure that a second command is placed on the QMP structure's +// command queue before the evented command is cancelled. This allows us +// to test a slightly different use case. We start a QMPLoop, send the +// system_powerdown command. We do this by sending the command directly +// down the QMP.cmdCh rather than calling a higher level function as this +// allows us to ensure that we have another command queued before we +// timeout the first command. We then send a qmp_capabilities command and +// then we shutdown. +// +// The system_powerdown command should be correctly sent but should block +// waiting for the SHUTDOWN event and should be successfully cancelled. +// The query_capabilities command should be successfully received and the +// QMP loop should exit gracefully. +func TestQMPEventedCommandCancelConcurrent(t *testing.T) { + var wg sync.WaitGroup + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBuffer(t) + + buf.AddCommmand("system_powerdown", nil, "error", nil) + buf.AddCommmand("qmp_capabilities", nil, "return", nil) + + cfg := QMPConfig{Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + checkVersion(t, connectedCh) + buf.startEventLoop(&wg) + + resCh := make(chan qmpResult) + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + q.cmdCh <- qmpCommand{ + ctx: ctx, + res: resCh, + name: "system_powerdown", + filter: &qmpEventFilter{ + eventName: "SHUTDOWN", + }, + } + + var cmdWg sync.WaitGroup + cmdWg.Add(1) + go func() { + err := q.ExecuteQMPCapabilities(context.Background()) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + cmdWg.Done() + }() + + <-resCh + cancelFn() + cmdWg.Wait() + q.Shutdown() + <-disconnectedCh + wg.Wait() +} + // Checks that events can be received and parsed. // // Two events are provisioned and the QMPLoop is started with an valid eventCh.