diff --git a/pkg/memlogd/cmd/logread/main.go b/pkg/memlogd/cmd/logread/main.go index 8eab0e3ec..6f6b5a85b 100644 --- a/pkg/memlogd/cmd/logread/main.go +++ b/pkg/memlogd/cmd/logread/main.go @@ -17,13 +17,15 @@ const ( logDumpFollow ) -type logEntry struct { +// LogEntry the structure of a log entry +type LogEntry struct { Time time.Time `json:"time"` Source string `json:"source"` Msg string `json:"msg"` + Error error } -func (msg *logEntry) String() string { +func (msg *LogEntry) String() string { return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg) } @@ -39,40 +41,64 @@ func main() { flag.BoolVar(&follow, "f", false, "follow log buffer") flag.Parse() + c, err := StreamLogs(socketPath, follow, dumpFollow) + if err != nil { + panic(err) + } + for entry := range c { + if entry.Error != nil { + panic(entry.Error) + } + fmt.Println(entry.String()) + } +} + +// StreamLogs read the memlogd logs from socketPath, convert them to LogEntry struct +// and send those on the return channel. If there is an error in parsing, it will be the +// Error on the LogEntry struct. When the socket is closed, will close the channel. +// If stream is complete, will close, unless follow is true, in which case it will +// continue to listen for new logs. +func StreamLogs(socketPath string, follow, dump bool) (<-chan LogEntry, error) { addr := net.UnixAddr{ Name: socketPath, Net: "unix", } conn, err := net.DialUnix("unix", nil, &addr) if err != nil { - panic(err) + return nil, err } defer conn.Close() var n int switch { - case dumpFollow: + case follow && dump: n, err = conn.Write([]byte{logDumpFollow}) - case follow && !dumpFollow: + case follow: n, err = conn.Write([]byte{logFollow}) default: n, err = conn.Write([]byte{logDump}) } if err != nil || n < 1 { - panic(err) + return nil, err } - var entry logEntry - decoder := json.NewDecoder(conn) - for { - if err := decoder.Decode(&entry); err != nil { - if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { - return + c := make(chan LogEntry) + go func(c chan<- LogEntry) { + var ( + entry LogEntry + decoder = json.NewDecoder(conn) + ) + for { + if err := decoder.Decode(&entry); err != nil { + if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + close(c) + return + } + entry = LogEntry{Error: err} } - panic(err) + c <- entry } - - fmt.Println(entry.String()) - } + }(c) + return c, nil } diff --git a/pkg/memlogd/cmd/startmemlogd/main.go b/pkg/memlogd/cmd/startmemlogd/main.go index dc696250c..5664f0832 100644 --- a/pkg/memlogd/cmd/startmemlogd/main.go +++ b/pkg/memlogd/cmd/startmemlogd/main.go @@ -23,7 +23,7 @@ func main() { flag.BoolVar(&detach, "detach", true, "detach from subprocess") flag.Parse() - laddr := net.UnixAddr{socketLogPath, "unixgram"} + laddr := net.UnixAddr{Name: socketLogPath, Net: "unixgram"} os.Remove(laddr.Name) // remove existing socket lconn, err := net.ListenUnixgram("unixgram", &laddr) if err != nil { @@ -34,7 +34,7 @@ func main() { panic(err) } - qaddr := net.UnixAddr{socketQueryPath, "unix"} + qaddr := net.UnixAddr{Name: socketQueryPath, Net: "unix"} os.Remove(qaddr.Name) // remove existing socket qconn, err := net.ListenUnix("unix", &qaddr) if err != nil {