expose logread function

Signed-off-by: Avi Deitcher <avi@deitcher.net>
This commit is contained in:
Avi Deitcher 2023-06-28 09:40:23 -07:00
parent 48e0eca4f5
commit 997c074db6
2 changed files with 44 additions and 18 deletions

View File

@ -17,13 +17,15 @@ const (
logDumpFollow logDumpFollow
) )
type logEntry struct { // LogEntry the structure of a log entry
type LogEntry struct {
Time time.Time `json:"time"` Time time.Time `json:"time"`
Source string `json:"source"` Source string `json:"source"`
Msg string `json:"msg"` Msg string `json:"msg"`
Error error
} }
func (msg *logEntry) String() string { func (msg *LogEntry) String() string {
return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg) return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg)
} }
@ -39,40 +41,64 @@ func main() {
flag.BoolVar(&follow, "f", false, "follow log buffer") flag.BoolVar(&follow, "f", false, "follow log buffer")
flag.Parse() flag.Parse()
c, err := StreamLogs(socketPath, follow, dumpFollow)
if err != nil {
panic(err)
}
for entry := range c {
if entry.Error != nil {
panic(entry.Error)
}
fmt.Println(entry.String())
}
}
// StreamLogs read the memlogd logs from socketPath, convert them to LogEntry struct
// and send those on the return channel. If there is an error in parsing, it will be the
// Error on the LogEntry struct. When the socket is closed, will close the channel.
// If stream is complete, will close, unless follow is true, in which case it will
// continue to listen for new logs.
func StreamLogs(socketPath string, follow, dump bool) (<-chan LogEntry, error) {
addr := net.UnixAddr{ addr := net.UnixAddr{
Name: socketPath, Name: socketPath,
Net: "unix", Net: "unix",
} }
conn, err := net.DialUnix("unix", nil, &addr) conn, err := net.DialUnix("unix", nil, &addr)
if err != nil { if err != nil {
panic(err) return nil, err
} }
defer conn.Close() defer conn.Close()
var n int var n int
switch { switch {
case dumpFollow: case follow && dump:
n, err = conn.Write([]byte{logDumpFollow}) n, err = conn.Write([]byte{logDumpFollow})
case follow && !dumpFollow: case follow:
n, err = conn.Write([]byte{logFollow}) n, err = conn.Write([]byte{logFollow})
default: default:
n, err = conn.Write([]byte{logDump}) n, err = conn.Write([]byte{logDump})
} }
if err != nil || n < 1 { if err != nil || n < 1 {
panic(err) return nil, err
} }
var entry logEntry c := make(chan LogEntry)
decoder := json.NewDecoder(conn) go func(c chan<- LogEntry) {
for { var (
if err := decoder.Decode(&entry); err != nil { entry LogEntry
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { decoder = json.NewDecoder(conn)
return )
for {
if err := decoder.Decode(&entry); err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
close(c)
return
}
entry = LogEntry{Error: err}
} }
panic(err) c <- entry
} }
}(c)
fmt.Println(entry.String()) return c, nil
}
} }

View File

@ -23,7 +23,7 @@ func main() {
flag.BoolVar(&detach, "detach", true, "detach from subprocess") flag.BoolVar(&detach, "detach", true, "detach from subprocess")
flag.Parse() flag.Parse()
laddr := net.UnixAddr{socketLogPath, "unixgram"} laddr := net.UnixAddr{Name: socketLogPath, Net: "unixgram"}
os.Remove(laddr.Name) // remove existing socket os.Remove(laddr.Name) // remove existing socket
lconn, err := net.ListenUnixgram("unixgram", &laddr) lconn, err := net.ListenUnixgram("unixgram", &laddr)
if err != nil { if err != nil {
@ -34,7 +34,7 @@ func main() {
panic(err) panic(err)
} }
qaddr := net.UnixAddr{socketQueryPath, "unix"} qaddr := net.UnixAddr{Name: socketQueryPath, Net: "unix"}
os.Remove(qaddr.Name) // remove existing socket os.Remove(qaddr.Name) // remove existing socket
qconn, err := net.ListenUnix("unix", &qaddr) qconn, err := net.ListenUnix("unix", &qaddr)
if err != nil { if err != nil {