projects/logging: promote pkg/memlogd to a toplevel package

This is an example external logging service which can be enabled by
adding it to the `init` section of the .yml, for example:

    ...
    init:
      - linuxkit/init:35866bb276c264a5f664bfac7456f4b9eeb87a4d
      - linuxkit/runc:v0.4
      - linuxkit/containerd:f2bc1bda1ab18146967fa1a149800aaf14bee81b
      - linuxkit/ca-certificates:v0.4
      - linuxkit/memlogd:cc035e5c9e4011ec1ba97a181a6689fc90965ce9
    onboot:
    ...

Signed-off-by: David Scott <dave.scott@docker.com>
This commit is contained in:
David Scott
2018-07-01 13:40:23 +01:00
parent 638c455dd2
commit c92af038fb
7 changed files with 0 additions and 0 deletions

View File

@@ -1,19 +0,0 @@
FROM linuxkit/alpine:1b05307ae8152e3d38f79e297b0632697a30c65c AS build
RUN apk add --no-cache go musl-dev
ENV GOPATH=/go PATH=$PATH:/go/bin
COPY cmd/ /go/src/
RUN go-compile.sh /go/src/memlogd
RUN go-compile.sh /go/src/logread
RUN go-compile.sh /go/src/logwrite
FROM scratch
ENTRYPOINT []
CMD []
WORKDIR /
COPY --from=build /go/bin/memlogd usr/bin/memlogd
COPY --from=build /go/bin/logread usr/bin/logread
COPY --from=build /go/bin/logwrite usr/bin/logwrite
# We'll start from init.d
COPY etc/ /etc/

View File

@@ -1,3 +0,0 @@
image: memlogd
binds:
- /var/run:/var/run

View File

@@ -1,55 +0,0 @@
package main
import (
"bufio"
"flag"
"net"
"os"
)
const (
logDump byte = iota
logFollow
logDumpFollow
)
func main() {
var err error
var socketPath string
var follow bool
var dumpFollow bool
flag.StringVar(&socketPath, "socket", "/var/run/memlogdq.sock", "memlogd log query socket")
flag.BoolVar(&dumpFollow, "F", false, "dump log, then follow")
flag.BoolVar(&follow, "f", false, "follow log buffer")
flag.Parse()
addr := net.UnixAddr{
Name: socketPath,
Net: "unix",
}
conn, err := net.DialUnix("unix", nil, &addr)
if err != nil {
panic(err)
}
defer conn.Close()
var n int
switch {
case dumpFollow:
n, err = conn.Write([]byte{logDumpFollow})
case follow && !dumpFollow:
n, err = conn.Write([]byte{logFollow})
default:
n, err = conn.Write([]byte{logDump})
}
if err != nil || n < 1 {
panic(err)
}
r := bufio.NewReader(conn)
r.WriteTo(os.Stdout)
}

View File

@@ -1,97 +0,0 @@
package main
import (
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"syscall"
)
func getLogFileSocketPair() (*os.File, int) {
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
panic(err)
}
localFd := fds[0]
remoteFd := fds[1]
localLogFile := os.NewFile(uintptr(localFd), "")
return localLogFile, remoteFd
}
func sendFD(conn *net.UnixConn, remoteAddr *net.UnixAddr, source string, fd int) error {
oobs := syscall.UnixRights(fd)
_, _, err := conn.WriteMsgUnix([]byte(source), oobs, remoteAddr)
return err
}
func main() {
var err error
var ok bool
var serverSocket string
var name string
flag.StringVar(&serverSocket, "socket", "/var/run/linuxkit-external-logging.sock", "socket to pass fd's to memlogd")
flag.StringVar(&name, "n", "", "name of sender, defaults to first argument if left blank")
flag.Parse()
args := flag.Args()
if len(args) < 1 {
log.Fatal("no command specified")
}
if name == "" {
name = args[0]
}
localStdoutLog, remoteStdoutFd := getLogFileSocketPair()
localStderrLog, remoteStderrFd := getLogFileSocketPair()
var outSocket int
if outSocket, err = syscall.Socket(syscall.AF_UNIX, syscall.SOCK_DGRAM, 0); err != nil {
log.Fatal("Unable to create socket: ", err)
}
var outFile net.Conn
if outFile, err = net.FileConn(os.NewFile(uintptr(outSocket), "")); err != nil {
log.Fatal(err)
}
var conn *net.UnixConn
if conn, ok = outFile.(*net.UnixConn); !ok {
log.Fatal("Internal error, invalid cast.")
}
raddr := net.UnixAddr{Name: serverSocket, Net: "unixgram"}
if err = sendFD(conn, &raddr, name+".stdout", remoteStdoutFd); err != nil {
log.Fatal("fd stdout send failed: ", err)
}
if err = sendFD(conn, &raddr, name+".stderr", remoteStderrFd); err != nil {
log.Fatal("fd stderr send failed: ", err)
}
cmd := exec.Command(args[0], args[1:]...)
outStderr := io.MultiWriter(localStderrLog, os.Stderr)
outStdout := io.MultiWriter(localStdoutLog, os.Stdout)
cmd.Stderr = outStderr
cmd.Stdout = outStdout
if err = cmd.Run(); err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
// exit with exit code from process
status := exitError.Sys().(syscall.WaitStatus)
os.Exit(status.ExitStatus())
} else {
// no exit code, report error and exit 1
fmt.Println(err)
os.Exit(1)
}
}
}

View File

@@ -1,345 +0,0 @@
package main
import (
"bufio"
"bytes"
"container/list"
"container/ring"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"sync"
"syscall"
"time"
)
type logEntry struct {
time time.Time
source string
msg string
}
type fdMessage struct {
name string
fd int
}
type logMode byte
const (
logDump logMode = iota
logFollow
logDumpFollow
)
type queryMessage struct {
conn *net.UnixConn
mode logMode
}
type connListener struct {
conn *net.UnixConn
cond *sync.Cond // condition and mutex used to notify listeners of more data
buffer bytes.Buffer
err error
exitOnEOF bool // exit instead of blocking if no more data in read buffer
}
func doLog(logCh chan logEntry, msg string) {
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: msg}
return
}
func logQueryHandler(l *connListener) {
defer l.conn.Close()
data := make([]byte, 0xffff)
l.cond.L.Lock()
for {
var n, remaining int
var rerr, werr error
for rerr == nil && werr == nil {
if n, rerr = l.buffer.Read(data); n == 0 { // process data before checking error
break // exit read loop to wait for more data
}
l.cond.L.Unlock()
remaining = n
w := data
for remaining > 0 && werr == nil {
w = data[:remaining]
n, werr = l.conn.Write(w)
w = w[n:]
remaining = remaining - n
}
l.cond.L.Lock()
}
// check errors
if werr != nil {
l.err = werr
l.cond.L.Unlock()
break
}
if rerr != nil && rerr != io.EOF { // EOF is ok, just wait for more data
l.err = rerr
l.cond.L.Unlock()
break
}
if l.exitOnEOF && rerr == io.EOF { // ... unless we should exit on EOF
l.err = nil
l.cond.L.Unlock()
break
}
l.cond.Wait() // unlock and wait for more data
}
}
func (msg *logEntry) String() string {
return fmt.Sprintf("%s %s %s", msg.time.Format(time.RFC3339), msg.source, msg.msg)
}
func ringBufferHandler(ringSize int, logCh chan logEntry, queryMsgChan chan queryMessage) {
// Anything that interacts with the ring buffer goes through this handler
ring := ring.New(ringSize)
listeners := list.New()
for {
select {
case msg := <-logCh:
fmt.Printf("%s\n", msg.String())
// add log entry
ring.Value = msg
ring = ring.Next()
// send to listeners
var l *connListener
var remove []*list.Element
for e := listeners.Front(); e != nil; e = e.Next() {
l = e.Value.(*connListener)
if l.err != nil {
remove = append(remove, e)
continue
}
l.cond.L.Lock()
l.buffer.WriteString(fmt.Sprintf("%s\n", msg.String()))
l.cond.L.Unlock()
l.cond.Signal()
}
if len(remove) > 0 { // remove listeners that returned errors
for _, e := range remove {
l = e.Value.(*connListener)
fmt.Println("Removing connection, error: ", l.err)
listeners.Remove(e)
}
}
case msg := <-queryMsgChan:
l := connListener{conn: msg.conn, cond: sync.NewCond(&sync.Mutex{}), err: nil, exitOnEOF: (msg.mode == logDump)}
listeners.PushBack(&l)
go logQueryHandler(&l)
if msg.mode == logDumpFollow || msg.mode == logDump {
l.cond.L.Lock()
// fill with current data in buffer
ring.Do(func(f interface{}) {
if msg, ok := f.(logEntry); ok {
s := fmt.Sprintf("%s\n", msg.String())
l.buffer.WriteString(s)
}
})
l.cond.L.Unlock()
l.cond.Signal() // signal handler that more data is available
}
}
}
}
func receiveQueryHandler(l *net.UnixListener, logCh chan logEntry, queryMsgChan chan queryMessage) {
for {
var conn *net.UnixConn
var err error
if conn, err = l.AcceptUnix(); err != nil {
doLog(logCh, fmt.Sprintf("Connection error %s", err))
continue
}
mode := make([]byte, 1)
n, err := conn.Read(mode)
if err != nil || n != 1 {
doLog(logCh, fmt.Sprintf("No mode received: %s", err))
}
queryMsgChan <- queryMessage{conn, logMode(mode[0])}
}
}
func receiveFdHandler(conn *net.UnixConn, logCh chan logEntry, fdMsgChan chan fdMessage) {
oob := make([]byte, 512)
b := make([]byte, 512)
for {
n, oobn, _, _, err := conn.ReadMsgUnix(b, oob)
if err != nil {
doLog(logCh, fmt.Sprintf("ERROR: Unable to read oob data: %s", err.Error()))
continue
}
if oobn == 0 {
continue
}
oobmsgs, err := syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
doLog(logCh, fmt.Sprintf("ERROR: Failed to parse socket control message: %s", err.Error()))
continue
}
for _, oobmsg := range oobmsgs {
r, err := syscall.ParseUnixRights(&oobmsg)
if err != nil {
doLog(logCh, fmt.Sprintf("ERROR: Failed to parse UNIX rights in oob data: %s", err.Error()))
continue
}
for _, fd := range r {
name := ""
if n > 0 {
name = string(b[:n])
}
fdMsgChan <- fdMessage{name: name, fd: fd}
}
}
}
}
func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) {
f := os.NewFile(uintptr(fd), "")
defer f.Close()
r := bufio.NewReader(f)
l, isPrefix, err := r.ReadLine()
var buffer bytes.Buffer
for err == nil {
buffer.Write(l)
for isPrefix {
l, isPrefix, err = r.ReadLine()
if err != nil {
break
}
if buffer.Len() < maxLineLen {
buffer.Write(l)
}
}
if buffer.Len() > maxLineLen {
buffer.Truncate(maxLineLen)
}
logCh <- logEntry{time: time.Now(), source: source, msg: buffer.String()}
buffer.Reset()
l, isPrefix, err = r.ReadLine()
}
}
func main() {
var err error
var socketQueryPath string
var passedQueryFD int
var socketLogPath string
var passedLogFD int
var linesInBuffer int
var lineMaxLength int
var daemonize bool
flag.StringVar(&socketQueryPath, "socket-query", "/var/run/memlogdq.sock", "unix domain socket for responding to log queries. Overridden by -fd-query")
flag.StringVar(&socketLogPath, "socket-log", "/var/run/linuxkit-external-logging.sock", "unix domain socket to listen for new fds to add to log. Overridden by -fd-log")
flag.IntVar(&passedLogFD, "fd-log", -1, "an existing SOCK_DGRAM socket for receiving fd's. Overrides -socket-log.")
flag.IntVar(&passedQueryFD, "fd-query", -1, "an existing SOCK_STREAM for receiving log read requets. Overrides -socket-query.")
flag.IntVar(&linesInBuffer, "max-lines", 5000, "Number of log lines to keep in memory")
flag.IntVar(&lineMaxLength, "max-line-len", 1024, "Maximum line length recorded. Additional bytes are dropped.")
flag.BoolVar(&daemonize, "daemonize", false, "Bind sockets and then daemonize.")
flag.Parse()
var connLogFd *net.UnixConn
if passedLogFD == -1 { // no fd on command line, use socket path
addr := net.UnixAddr{
Name: socketLogPath,
Net: "unixgram",
}
if connLogFd, err = net.ListenUnixgram("unixgram", &addr); err != nil {
log.Fatal("Unable to open socket: ", err)
}
defer os.Remove(addr.Name)
} else { // use given fd
var f net.Conn
if f, err = net.FileConn(os.NewFile(uintptr(passedLogFD), "")); err != nil {
log.Fatal("Unable to open fd: ", err)
}
connLogFd = f.(*net.UnixConn)
}
defer connLogFd.Close()
var connQuery *net.UnixListener
if passedQueryFD == -1 { // no fd on command line, use socket path
addr := net.UnixAddr{
Name: socketQueryPath,
Net: "unix",
}
if connQuery, err = net.ListenUnix("unix", &addr); err != nil {
log.Fatal("Unable to open socket: ", err)
}
defer os.Remove(addr.Name)
} else { // use given fd
var f net.Listener
if f, err = net.FileListener(os.NewFile(uintptr(passedQueryFD), "")); err != nil {
log.Fatal("Unable to open fd: ", err)
}
connQuery = f.(*net.UnixListener)
}
defer connQuery.Close()
if daemonize {
child := exec.Command(os.Args[0],
"-fd-log", "3", // connLogFd in ExtraFiles below
"-fd-query", "4", // connQuery in ExtraFiles below
"-max-lines", fmt.Sprintf("%d", linesInBuffer),
"-max-line-len", fmt.Sprintf("%d", lineMaxLength),
)
connLogFile, err := connLogFd.File()
if err != nil {
log.Fatalf("The -fd-log cannot be represented as a *File: %s", err)
}
connQueryFile, err := connQuery.File()
if err != nil {
log.Fatalf("The -fd-query cannot be represented as a *File: %s", err)
}
child.ExtraFiles = append(child.ExtraFiles, connLogFile, connQueryFile)
if err := child.Start(); err != nil {
log.Fatalf("Failed to re-exec: %s", err)
}
os.Exit(0)
}
logCh := make(chan logEntry)
fdMsgChan := make(chan fdMessage)
queryMsgChan := make(chan queryMessage)
go receiveFdHandler(connLogFd, logCh, fdMsgChan)
go receiveQueryHandler(connQuery, logCh, queryMsgChan)
go ringBufferHandler(linesInBuffer, logCh, queryMsgChan)
doLog(logCh, "memlogd started")
for true {
select {
case msg := <-fdMsgChan: // incoming fd
go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
}
}
}

View File

@@ -1,76 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/exec"
"syscall"
)
func main() {
var socketLogPath string
var socketQueryPath string
var memlogdBundle string
var pidFile string
var detach bool
flag.StringVar(&socketLogPath, "socket-log", "/var/run/linuxkit-external-logging.sock", "path to fd logging socket. Created and passed to logging container. Existing socket will be removed.")
flag.StringVar(&socketQueryPath, "socket-query", "/var/run/memlogdq.sock", "path to query socket. Created and passed to logging container. Existing socket will be removed.")
flag.StringVar(&memlogdBundle, "bundle", "/containers/init/memlogd", "runc bundle with memlogd")
flag.StringVar(&pidFile, "pid-file", "/run/memlogd.pid", "path to pid file")
flag.BoolVar(&detach, "detach", true, "detach from subprocess")
flag.Parse()
laddr := net.UnixAddr{socketLogPath, "unixgram"}
os.Remove(laddr.Name) // remove existing socket
lconn, err := net.ListenUnixgram("unixgram", &laddr)
if err != nil {
panic(err)
}
lfd, err := lconn.File()
if err != nil {
panic(err)
}
qaddr := net.UnixAddr{socketQueryPath, "unix"}
os.Remove(qaddr.Name) // remove existing socket
qconn, err := net.ListenUnix("unix", &qaddr)
if err != nil {
panic(err)
}
qfd, err := qconn.File()
if err != nil {
panic(err)
}
cmd := exec.Command("/sbin/start-stop-daemon", "--start", "--pidfile", pidFile,
"--exec", "/usr/bin/runc", "--", "run", "--preserve-fds=2",
"--bundle", memlogdBundle,
"--pid-file", pidFile, "memlogd")
log.Println(cmd.Args)
cmd.ExtraFiles = append(cmd.ExtraFiles, lfd, qfd)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
panic(err)
}
if detach {
if err := cmd.Process.Release(); err != nil {
panic(err)
}
} else {
if err := cmd.Wait(); err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
// exit with exit code from process
status := exitError.Sys().(syscall.WaitStatus)
os.Exit(status.ExitStatus())
} else {
// no exit code, report error and exit 1
fmt.Println(err)
os.Exit(1)
}
}
}
}

View File

@@ -1,3 +0,0 @@
#!/bin/sh
/usr/bin/memlogd -daemonize