qemu: Fix command cancelling.

There was a bug with the cancelling of commands that meant that when
an attempt was made to cancel a command and then to issue a second
command, the first, cancelled command was re-issued.  This commit
fixes the issue and adds a new test case to check that cancelling
of commands does indeed work.  There was also an issue with the
test harness which meant that tests that issued more than one command
were not actually testing the second and third commands.

Signed-off-by: Mark Ryan <mark.d.ryan@intel.com>
This commit is contained in:
Mark Ryan 2017-02-14 18:25:59 +00:00
parent a8a798b0c0
commit c6f334533a
2 changed files with 135 additions and 6 deletions

34
qmp.go
View File

@ -254,6 +254,15 @@ func (q *QMP) processQMPInput(line []byte, cmdQueue *list.List) {
}
}
func currentCommandDoneCh(cmdQueue *list.List) <-chan struct{} {
cmdEl := cmdQueue.Front()
if cmdEl == nil {
return nil
}
cmd := cmdEl.Value.(*qmpCommand)
return cmd.ctx.Done()
}
func (q *QMP) writeNextQMPCommand(cmdQueue *list.List) {
cmdEl := cmdQueue.Front()
cmd := cmdEl.Value.(*qmpCommand)
@ -293,6 +302,16 @@ func failOutstandingCommands(cmdQueue *list.List) {
}
}
func (q *QMP) cancelCurrentCommand(cmdQueue *list.List) {
cmdEl := cmdQueue.Front()
cmd := cmdEl.Value.(*qmpCommand)
if cmd.resultReceived {
q.finaliseCommand(cmdEl, cmdQueue, false)
} else {
cmd.filter = nil
}
}
func (q *QMP) parseVersion(version []byte) *QMPVersion {
var qmp map[string]interface{}
err := json.Unmarshal(version, &qmp)
@ -343,6 +362,7 @@ func (q *QMP) mainLoop() {
}()
version := []byte{}
var cmdDoneCh <-chan struct{}
DONE:
for {
@ -359,6 +379,7 @@ DONE:
}
if cmdQueue.Len() >= 1 {
q.writeNextQMPCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
break DONE
}
@ -373,14 +394,25 @@ DONE:
return
}
_ = cmdQueue.PushBack(&cmd)
if cmdQueue.Len() >= 1 {
// We only want to execute the new cmd if there
// are no other commands pending. If there are
// commands pending our new command will get
// run when the pending commands complete.
if cmdQueue.Len() == 1 {
q.writeNextQMPCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
case line, ok := <-fromVMCh:
if !ok {
return
}
q.processQMPInput(line, cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
case <-cmdDoneCh:
q.cancelCurrentCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
}
}

View File

@ -186,7 +186,9 @@ func (b *qmpTestCommandBuffer) Read(p []byte) (n int, err error) {
func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) {
var cmdJSON map[string]interface{}
if b.currentCmd >= len(b.cmds) {
currentCmd := b.currentCmd
b.currentCmd++
if currentCmd >= len(b.cmds) {
b.t.Fatalf("Unexpected command")
}
err := json.Unmarshal(p, &cmdJSON)
@ -195,14 +197,14 @@ func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) {
}
cmdName := cmdJSON["execute"]
gotCmdName := cmdName.(string)
result := b.results[b.currentCmd].result
if gotCmdName != b.cmds[b.currentCmd].name {
result := b.results[currentCmd].result
if gotCmdName != b.cmds[currentCmd].name {
b.t.Errorf("Unexpected command. Expected %s found %s",
b.cmds[b.currentCmd].name, gotCmdName)
b.cmds[currentCmd].name, gotCmdName)
result = "error"
}
resultMap := make(map[string]interface{})
resultMap[result] = b.results[b.currentCmd].data
resultMap[result] = b.results[currentCmd].data
encodedRes, err := json.Marshal(&resultMap)
if err != nil {
b.t.Errorf("Unable to encode result: %v", err)
@ -582,6 +584,101 @@ func TestQMPSystemPowerdown(t *testing.T) {
wg.Wait()
}
// Checks that event commands can be cancelled.
//
// We start a QMPLoop, send the system_powerdown command. This command
// will time out after 1 second as the SHUTDOWN event never arrives.
// We then send a quit command to terminate the session.
//
// The system_powerdown command should be correctly sent but should block
// waiting for the SHUTDOWN event and should be successfully cancelled.
// The quit command should be successfully received and the QMP loop should
// exit gracefully.
func TestQMPEventedCommandCancel(t *testing.T) {
var wg sync.WaitGroup
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("system_powerdown", nil, "return", nil)
buf.AddCommmand("quit", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
buf.startEventLoop(&wg)
ctx, cancelFN := context.WithTimeout(context.Background(), time.Second)
err := q.ExecuteSystemPowerdown(ctx)
cancelFN()
if err == nil {
t.Fatalf("Expected SystemPowerdown to fail")
}
err = q.ExecuteQuit(context.Background())
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
q.Shutdown()
<-disconnectedCh
wg.Wait()
}
// Checks that queued commands execute after an evented command is cancelled.
//
// This test is similar to the previous test with the exception that it
// tries to ensure that a second command is placed on the QMP structure's
// command queue before the evented command is cancelled. This allows us
// to test a slightly different use case. We start a QMPLoop, send the
// system_powerdown command. We do this by sending the command directly
// down the QMP.cmdCh rather than calling a higher level function as this
// allows us to ensure that we have another command queued before we
// timeout the first command. We then send a qmp_capabilities command and
// then we shutdown.
//
// The system_powerdown command should be correctly sent but should block
// waiting for the SHUTDOWN event and should be successfully cancelled.
// The query_capabilities command should be successfully received and the
// QMP loop should exit gracefully.
func TestQMPEventedCommandCancelConcurrent(t *testing.T) {
var wg sync.WaitGroup
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("system_powerdown", nil, "error", nil)
buf.AddCommmand("qmp_capabilities", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
buf.startEventLoop(&wg)
resCh := make(chan qmpResult)
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
q.cmdCh <- qmpCommand{
ctx: ctx,
res: resCh,
name: "system_powerdown",
filter: &qmpEventFilter{
eventName: "SHUTDOWN",
},
}
var cmdWg sync.WaitGroup
cmdWg.Add(1)
go func() {
err := q.ExecuteQMPCapabilities(context.Background())
if err != nil {
t.Errorf("Unexpected error %v", err)
}
cmdWg.Done()
}()
<-resCh
cancelFn()
cmdWg.Wait()
q.Shutdown()
<-disconnectedCh
wg.Wait()
}
// Checks that events can be received and parsed.
//
// Two events are provisioned and the QMPLoop is started with an valid eventCh.