mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-22 10:31:35 +00:00
Fix streaming API request error
Signed-off-by: Nathan LeClaire <nathan.leclaire@gmail.com>
This commit is contained in:
parent
69468bf42f
commit
1ca9096f55
@ -52,6 +52,10 @@ func dockerHTTPGet(ctx context.Context, url string) (*http.Response, error) {
|
||||
Transport: &UnixSocketRoundTripper{},
|
||||
}
|
||||
|
||||
return dockerHTTPGetWithClient(ctx, url, client)
|
||||
}
|
||||
|
||||
func dockerHTTPGetWithClient(ctx context.Context, url string, client *http.Client) (*http.Response, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -69,11 +73,24 @@ func dockerHTTPGet(ctx context.Context, url string) (*http.Response, error) {
|
||||
}
|
||||
|
||||
return resp, err
|
||||
|
||||
}
|
||||
|
||||
// UnixSocketRoundTripper provides a way to make HTTP request to Docker socket
|
||||
// directly.
|
||||
type UnixSocketRoundTripper struct{}
|
||||
type UnixSocketRoundTripper struct {
|
||||
Stream bool
|
||||
conn *httputil.ClientConn
|
||||
}
|
||||
|
||||
// Close will close the connection if the caller needs to clean up after
|
||||
// themselves in a streaming request.
|
||||
func (u UnixSocketRoundTripper) Close() error {
|
||||
if u.conn != nil {
|
||||
return u.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RoundTrip dials the Docker UNIX socket to make a HTTP request.
|
||||
func (u UnixSocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
@ -81,9 +98,16 @@ func (u UnixSocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn := httputil.NewClientConn(dial, nil)
|
||||
defer conn.Close()
|
||||
return conn.Do(req)
|
||||
u.conn = httputil.NewClientConn(dial, nil)
|
||||
|
||||
// If the client makes a streaming request (e.g., /container/x/logs)
|
||||
// it's their responsibility to close the connection, because it needs
|
||||
// to remain open to stream the response body.
|
||||
if !u.Stream {
|
||||
defer u.conn.Close()
|
||||
}
|
||||
|
||||
return u.conn.Do(req)
|
||||
}
|
||||
|
||||
// Listen starts the HTTPDiagnosticListener and sets up handlers for its endpoints
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -20,15 +21,15 @@ type SystemContainerCapturer struct{}
|
||||
|
||||
// Capture writes output from a CommandCapturer to a tar archive
|
||||
func (s SystemContainerCapturer) Capture(parentCtx context.Context, w *tar.Writer) {
|
||||
done := make(chan struct{})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultCaptureTimeout)
|
||||
defer cancel()
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
resp, err := dockerHTTPGet(ctx, "/containers/json?all=1&label="+systemContainerLabel)
|
||||
if err != nil {
|
||||
log.Println("ERROR:", err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
@ -40,22 +41,35 @@ func (s SystemContainerCapturer) Capture(parentCtx context.Context, w *tar.Write
|
||||
}{}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&names); err != nil {
|
||||
log.Println("ERROR:", err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
for _, c := range names {
|
||||
resp, err := dockerHTTPGet(ctx, "/containers/"+c.ID+"/logs?stderr=1&stdout=1×tamps=1")
|
||||
transport := &UnixSocketRoundTripper{
|
||||
Stream: true,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
resp, err := dockerHTTPGetWithClient(ctx, "/containers/"+c.ID+"/logs?stderr=1&stdout=1×tamps=1&tail=all", client)
|
||||
if err != nil {
|
||||
log.Println("ERROR:", err)
|
||||
log.Println("ERROR (get request):", err)
|
||||
continue
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
// logs makes streaming request where the original http
|
||||
// conn is left open so we must clean up after
|
||||
// ourselves when we're done reading
|
||||
defer transport.Close()
|
||||
|
||||
logLines, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Println("ERROR:", err)
|
||||
log.Println("ERROR (reading response):", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -72,13 +86,16 @@ func (s SystemContainerCapturer) Capture(parentCtx context.Context, w *tar.Write
|
||||
tarWrite(w, bytes.NewBuffer(logLines), systemLogDir+c.Names[0])
|
||||
}
|
||||
|
||||
done <- struct{}{}
|
||||
errCh <- nil
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("System container log capture error", ctx.Err())
|
||||
case <-done:
|
||||
log.Println("System container log capture finished")
|
||||
log.Println("System container log capture context error", ctx.Err())
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
log.Println("System container log capture error", err)
|
||||
}
|
||||
log.Println("System container log capture finished successfully")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user