mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-21 10:09:07 +00:00
Merge pull request #3807 from dgageot/memlogd-json
Write log entries as json
This commit is contained in:
commit
e97dda48c5
@ -1,10 +1,12 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -13,6 +15,16 @@ const (
|
|||||||
logDumpFollow
|
logDumpFollow
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type logEntry struct {
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Source string `json:"source"`
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (msg *logEntry) String() string {
|
||||||
|
return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg)
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@ -49,7 +61,13 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := bufio.NewReader(conn)
|
var entry logEntry
|
||||||
r.WriteTo(os.Stdout)
|
decoder := json.NewDecoder(conn)
|
||||||
|
for {
|
||||||
|
if err := decoder.Decode(&entry); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(entry.String())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,9 +5,9 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"container/list"
|
"container/list"
|
||||||
"container/ring"
|
"container/ring"
|
||||||
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
@ -18,9 +18,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type logEntry struct {
|
type logEntry struct {
|
||||||
time time.Time
|
Time time.Time `json:"time"`
|
||||||
source string
|
Source string `json:"source"`
|
||||||
msg string
|
Msg string `json:"msg"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (msg *logEntry) String() string {
|
||||||
|
return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
type fdMessage struct {
|
type fdMessage struct {
|
||||||
@ -49,26 +53,25 @@ type connListener struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func doLog(logCh chan logEntry, msg string) {
|
func doLog(logCh chan logEntry, msg string) {
|
||||||
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: msg}
|
logCh <- logEntry{
|
||||||
return
|
Time: time.Now(),
|
||||||
|
Source: "memlogd",
|
||||||
|
Msg: msg,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func logQueryHandler(l *connListener) {
|
func logQueryHandler(l *connListener) {
|
||||||
defer l.conn.Close()
|
defer l.conn.Close()
|
||||||
|
|
||||||
|
encoder := json.NewEncoder(l.conn)
|
||||||
for msg := range l.output {
|
for msg := range l.output {
|
||||||
_, err := io.Copy(l.conn, strings.NewReader(msg.String()+"\n"))
|
if err := encoder.Encode(msg); err != nil {
|
||||||
if err != nil {
|
|
||||||
l.err = err
|
l.err = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (msg *logEntry) String() string {
|
|
||||||
return fmt.Sprintf("%s,%s;%s", msg.time.Format(time.RFC3339Nano), msg.source, msg.msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) {
|
func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) {
|
||||||
// Anything that interacts with the ring buffer goes through this handler
|
// Anything that interacts with the ring buffer goes through this handler
|
||||||
ring := ring.New(ringSize)
|
ring := ring.New(ringSize)
|
||||||
@ -77,7 +80,8 @@ func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-logCh:
|
case msg := <-logCh:
|
||||||
fmt.Printf("%s\n", msg.String())
|
fmt.Println(msg.String())
|
||||||
|
|
||||||
// add log entry
|
// add log entry
|
||||||
ring.Value = msg
|
ring.Value = msg
|
||||||
ring = ring.Next()
|
ring = ring.Next()
|
||||||
@ -213,7 +217,11 @@ func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) {
|
|||||||
if buffer.Len() > maxLineLen {
|
if buffer.Len() > maxLineLen {
|
||||||
buffer.Truncate(maxLineLen)
|
buffer.Truncate(maxLineLen)
|
||||||
}
|
}
|
||||||
logCh <- logEntry{time: time.Now(), source: source, msg: buffer.String()}
|
logCh <- logEntry{
|
||||||
|
Time: time.Now(),
|
||||||
|
Source: source,
|
||||||
|
Msg: buffer.String(),
|
||||||
|
}
|
||||||
buffer.Reset()
|
buffer.Reset()
|
||||||
|
|
||||||
l, isPrefix, err = r.ReadLine()
|
l, isPrefix, err = r.ReadLine()
|
||||||
@ -221,20 +229,9 @@ func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func loggingRequestHandler(lineMaxLength int, logCh chan logEntry, fdMsgChan chan fdMessage) {
|
func loggingRequestHandler(lineMaxLength int, logCh chan logEntry, fdMsgChan chan fdMessage) {
|
||||||
for true {
|
for msg := range fdMsgChan {
|
||||||
select {
|
|
||||||
case msg := <-fdMsgChan: // incoming fd
|
|
||||||
if strings.Contains(msg.name, ";") {
|
|
||||||
// The log message spec bans ";" in the log names
|
|
||||||
doLog(logCh, fmt.Sprintf("ERROR: cannot register log with name '%s' as it contains ;", msg.name))
|
|
||||||
if err := syscall.Close(msg.fd); err != nil {
|
|
||||||
doLog(logCh, fmt.Sprintf("ERROR: failed to close fd: %s", err))
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
|
go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -24,7 +23,7 @@ func TestNonblock(t *testing.T) {
|
|||||||
// Overflow the log to make sure it doesn't block
|
// Overflow the log to make sure it doesn't block
|
||||||
for i := 0; i < 2*linesInBuffer; i++ {
|
for i := 0; i < 2*linesInBuffer; i++ {
|
||||||
select {
|
select {
|
||||||
case logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestNonblock"}:
|
case logCh <- logEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestNonblock"}:
|
||||||
continue
|
continue
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
t.Errorf("write to the logger blocked for over 1s after %d (size was set to %d)", i, linesInBuffer)
|
t.Errorf("write to the logger blocked for over 1s after %d (size was set to %d)", i, linesInBuffer)
|
||||||
@ -43,7 +42,7 @@ func TestFinite(t *testing.T) {
|
|||||||
|
|
||||||
// Overflow the log by 2x
|
// Overflow the log by 2x
|
||||||
for i := 0; i < 2*linesInBuffer; i++ {
|
for i := 0; i < 2*linesInBuffer; i++ {
|
||||||
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite"}
|
logCh <- logEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestFinite"}
|
||||||
}
|
}
|
||||||
a, b := loopback()
|
a, b := loopback()
|
||||||
defer a.Close()
|
defer a.Close()
|
||||||
@ -83,7 +82,7 @@ func TestFinite2(t *testing.T) {
|
|||||||
|
|
||||||
// fill the ring
|
// fill the ring
|
||||||
for i := 0; i < linesInBuffer; i++ {
|
for i := 0; i < linesInBuffer; i++ {
|
||||||
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite2"}
|
logCh <- logEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestFinite2"}
|
||||||
}
|
}
|
||||||
|
|
||||||
a, b := loopback()
|
a, b := loopback()
|
||||||
@ -113,63 +112,6 @@ func TestFinite2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGoodName(t *testing.T) {
|
|
||||||
// Test that the source names can't contain ";"
|
|
||||||
linesInBuffer := 10
|
|
||||||
logCh := make(chan logEntry)
|
|
||||||
fdMsgChan := make(chan fdMessage)
|
|
||||||
queryMsgChan := make(chan queryMessage)
|
|
||||||
|
|
||||||
go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
|
|
||||||
go loggingRequestHandler(80, logCh, fdMsgChan)
|
|
||||||
|
|
||||||
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Unable to create socketpair: ", err)
|
|
||||||
}
|
|
||||||
a := fdToConn(fds[0])
|
|
||||||
b := fdToConn(fds[1])
|
|
||||||
defer a.Close()
|
|
||||||
defer b.Close()
|
|
||||||
// defer close fds
|
|
||||||
|
|
||||||
fdMsgChan <- fdMessage{
|
|
||||||
name: "semi-colons are banned;",
|
|
||||||
fd: fds[0],
|
|
||||||
}
|
|
||||||
// although the fd should be rejected my memlogd the Write should be buffered
|
|
||||||
// by the kernel and not block.
|
|
||||||
if _, err := b.Write([]byte("hello\n")); err != nil {
|
|
||||||
log.Fatalf("Failed to write log message: %s", err)
|
|
||||||
}
|
|
||||||
c, d := loopback()
|
|
||||||
defer c.Close()
|
|
||||||
defer d.Close()
|
|
||||||
// this log should not be in the ring because the connection was rejected.
|
|
||||||
queryM := queryMessage{
|
|
||||||
conn: c,
|
|
||||||
mode: logDumpFollow,
|
|
||||||
}
|
|
||||||
queryMsgChan <- queryM
|
|
||||||
// The error log is generated asynchronously. It should be fast. On error time out
|
|
||||||
// after 5s.
|
|
||||||
d.SetDeadline(time.Now().Add(5 * time.Second))
|
|
||||||
r := bufio.NewReader(d)
|
|
||||||
for {
|
|
||||||
line, err := r.ReadString('\n')
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Unexpected error reading from socket: %s", err)
|
|
||||||
}
|
|
||||||
if strings.Contains(line, "ERROR: cannot register log") {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Fatal("Failed to read error message when registering a log with a ;")
|
|
||||||
}
|
|
||||||
|
|
||||||
// caller must close fd themselves: closing the net.Conn will not close fd.
|
// caller must close fd themselves: closing the net.Conn will not close fd.
|
||||||
func fdToConn(fd int) net.Conn {
|
func fdToConn(fd int) net.Conn {
|
||||||
f := os.NewFile(uintptr(fd), "")
|
f := os.NewFile(uintptr(fd), "")
|
||||||
|
Loading…
Reference in New Issue
Block a user