mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-10-22 12:29:49 +00:00
Correct `staticcheck` linter issues. Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
176 lines
3.9 KiB
Go
176 lines
3.9 KiB
Go
//
|
|
// Copyright (c) 2017 Intel Corporation
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package hyperstart
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
)
|
|
|
|
type ctlDataType string
|
|
|
|
const (
|
|
eventType ctlDataType = "ctlEvent"
|
|
replyType ctlDataType = "ctlReply"
|
|
)
|
|
|
|
type multicast struct {
|
|
bufReplies []*DecodedMessage
|
|
reply []chan *DecodedMessage
|
|
event map[string]chan *DecodedMessage
|
|
ctl net.Conn
|
|
sync.Mutex
|
|
}
|
|
|
|
func newMulticast(ctlConn net.Conn) *multicast {
|
|
return &multicast{
|
|
bufReplies: []*DecodedMessage{},
|
|
reply: []chan *DecodedMessage{},
|
|
event: make(map[string]chan *DecodedMessage),
|
|
ctl: ctlConn,
|
|
}
|
|
}
|
|
|
|
func startCtlMonitor(ctlConn net.Conn, done chan<- interface{}) *multicast {
|
|
ctlMulticast := newMulticast(ctlConn)
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := ReadCtlMessage(ctlMulticast.ctl)
|
|
if err != nil {
|
|
hyperLog.Infof("Read on CTL channel ended: %s", err)
|
|
break
|
|
}
|
|
|
|
err = ctlMulticast.write(msg)
|
|
if err != nil {
|
|
hyperLog.Errorf("Multicaster write error: %s", err)
|
|
break
|
|
}
|
|
}
|
|
|
|
close(done)
|
|
}()
|
|
|
|
return ctlMulticast
|
|
}
|
|
|
|
func (m *multicast) buildEventID(containerID, processID string) string {
|
|
return fmt.Sprintf("%s-%s", containerID, processID)
|
|
}
|
|
|
|
func (m *multicast) sendEvent(msg *DecodedMessage) error {
|
|
var paeData PAECommand
|
|
|
|
err := json.Unmarshal(msg.Message, &paeData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
uniqueID := m.buildEventID(paeData.Container, paeData.Process)
|
|
channel, exist := m.event[uniqueID]
|
|
if !exist {
|
|
return nil
|
|
}
|
|
|
|
channel <- msg
|
|
|
|
delete(m.event, uniqueID)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *multicast) sendReply(msg *DecodedMessage) error {
|
|
m.Lock()
|
|
if len(m.reply) == 0 {
|
|
m.bufReplies = append(m.bufReplies, msg)
|
|
m.Unlock()
|
|
return nil
|
|
}
|
|
|
|
replyChannel := m.reply[0]
|
|
m.reply = m.reply[1:]
|
|
|
|
m.Unlock()
|
|
|
|
// The current reply channel has been removed from the list, that's why
|
|
// we can be out of the mutex to send through that channel. Indeed, there
|
|
// is no risk that someone else tries to write on this channel.
|
|
replyChannel <- msg
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *multicast) processBufferedReply(channel chan *DecodedMessage) {
|
|
m.Lock()
|
|
|
|
if len(m.bufReplies) == 0 {
|
|
m.reply = append(m.reply, channel)
|
|
m.Unlock()
|
|
return
|
|
}
|
|
|
|
msg := m.bufReplies[0]
|
|
m.bufReplies = m.bufReplies[1:]
|
|
|
|
m.Unlock()
|
|
|
|
// The current buffered reply message has been removed from the list, and
|
|
// the channel have not been added to the reply list, that's why we can be
|
|
// out of the mutex to send the buffered message through that channel.
|
|
// There is no risk that someone else tries to write this message on another
|
|
// channel, or another message on this channel.
|
|
channel <- msg
|
|
}
|
|
|
|
func (m *multicast) write(msg *DecodedMessage) error {
|
|
switch msg.Code {
|
|
case NextCode:
|
|
return nil
|
|
case ProcessAsyncEventCode:
|
|
return m.sendEvent(msg)
|
|
default:
|
|
return m.sendReply(msg)
|
|
}
|
|
}
|
|
|
|
func (m *multicast) listen(containerID, processID string, dataType ctlDataType) (chan *DecodedMessage, error) {
|
|
switch dataType {
|
|
case replyType:
|
|
newChan := make(chan *DecodedMessage)
|
|
|
|
go m.processBufferedReply(newChan)
|
|
|
|
return newChan, nil
|
|
case eventType:
|
|
uniqueID := m.buildEventID(containerID, processID)
|
|
|
|
_, exist := m.event[uniqueID]
|
|
if exist {
|
|
return nil, fmt.Errorf("Channel already assigned for ID %s", uniqueID)
|
|
}
|
|
|
|
m.event[uniqueID] = make(chan *DecodedMessage)
|
|
|
|
return m.event[uniqueID], nil
|
|
default:
|
|
return nil, fmt.Errorf("Unknown data type: %s", dataType)
|
|
}
|
|
}
|