Files
kata-containers/virtcontainers/pkg/hyperstart/multicast.go
Samuel Ortiz 24eff72d82 virtcontainers: Initial import
This is a virtcontainers 1.0.8 import into Kata Containers runtime.

virtcontainers is a Go library designed to manage hardware virtualized
pods and containers. It is the core Clear Containers framework and will
become the core Kata Containers framework, as discussed at
https://github.com/kata-containers/runtime/issues/33

Some more more pointers:

virtcontainers README, including some design and architecure notes:
https://github.com/containers/virtcontainers/blob/master/README.md

virtcontainers 1.0 API:
https://github.com/containers/virtcontainers/blob/master/documentation/api/1.0/api.md

Fixes #40

Signed-off-by: Samuel Ortiz <sameo@linux.intel.com>
2018-03-13 00:49:46 +01:00

176 lines
3.9 KiB
Go

//
// Copyright (c) 2017 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package hyperstart
import (
"encoding/json"
"fmt"
"net"
"sync"
)
type ctlDataType string
const (
eventType ctlDataType = "ctlEvent"
replyType ctlDataType = "ctlReply"
)
type multicast struct {
bufReplies []*DecodedMessage
reply []chan *DecodedMessage
event map[string]chan *DecodedMessage
ctl net.Conn
sync.Mutex
}
func newMulticast(ctlConn net.Conn) *multicast {
return &multicast{
bufReplies: []*DecodedMessage{},
reply: []chan *DecodedMessage{},
event: make(map[string]chan *DecodedMessage),
ctl: ctlConn,
}
}
func startCtlMonitor(ctlConn net.Conn, done chan<- interface{}) *multicast {
ctlMulticast := newMulticast(ctlConn)
go func() {
for {
msg, err := ReadCtlMessage(ctlMulticast.ctl)
if err != nil {
hyperLog.Infof("Read on CTL channel ended: %s", err)
break
}
err = ctlMulticast.write(msg)
if err != nil {
hyperLog.Errorf("Multicaster write error: %s", err)
break
}
}
close(done)
}()
return ctlMulticast
}
func (m *multicast) buildEventID(containerID, processID string) string {
return fmt.Sprintf("%s-%s", containerID, processID)
}
func (m *multicast) sendEvent(msg *DecodedMessage) error {
var paeData PAECommand
err := json.Unmarshal(msg.Message, paeData)
if err != nil {
return err
}
uniqueID := m.buildEventID(paeData.Container, paeData.Process)
channel, exist := m.event[uniqueID]
if !exist {
return nil
}
channel <- msg
delete(m.event, uniqueID)
return nil
}
func (m *multicast) sendReply(msg *DecodedMessage) error {
m.Lock()
if len(m.reply) == 0 {
m.bufReplies = append(m.bufReplies, msg)
m.Unlock()
return nil
}
replyChannel := m.reply[0]
m.reply = m.reply[1:]
m.Unlock()
// The current reply channel has been removed from the list, that's why
// we can be out of the mutex to send through that channel. Indeed, there
// is no risk that someone else tries to write on this channel.
replyChannel <- msg
return nil
}
func (m *multicast) processBufferedReply(channel chan *DecodedMessage) {
m.Lock()
if len(m.bufReplies) == 0 {
m.reply = append(m.reply, channel)
m.Unlock()
return
}
msg := m.bufReplies[0]
m.bufReplies = m.bufReplies[1:]
m.Unlock()
// The current buffered reply message has been removed from the list, and
// the channel have not been added to the reply list, that's why we can be
// out of the mutex to send the buffered message through that channel.
// There is no risk that someone else tries to write this message on another
// channel, or another message on this channel.
channel <- msg
}
func (m *multicast) write(msg *DecodedMessage) error {
switch msg.Code {
case NextCode:
return nil
case ProcessAsyncEventCode:
return m.sendEvent(msg)
default:
return m.sendReply(msg)
}
}
func (m *multicast) listen(containerID, processID string, dataType ctlDataType) (chan *DecodedMessage, error) {
switch dataType {
case replyType:
newChan := make(chan *DecodedMessage)
go m.processBufferedReply(newChan)
return newChan, nil
case eventType:
uniqueID := m.buildEventID(containerID, processID)
_, exist := m.event[uniqueID]
if exist {
return nil, fmt.Errorf("Channel already assigned for ID %s", uniqueID)
}
m.event[uniqueID] = make(chan *DecodedMessage)
return m.event[uniqueID], nil
default:
return nil, fmt.Errorf("Unknown data type: %s", dataType)
}
}