Merge pull request #7123 from satnam6502/logging

Propagate pod and container name for log files
This commit is contained in:
Dawn Chen 2015-04-23 10:13:08 -07:00
commit f9156c281a
15 changed files with 134 additions and 19 deletions

View File

@ -1,7 +1,7 @@
.PHONY: build push
IMAGE = fluentd-elasticsearch
TAG = 1.3
TAG = 1.4
build:
docker build -t gcr.io/google_containers/$(IMAGE):$(TAG) .

View File

@ -37,14 +37,14 @@
type tail
format json
time_key time
path /var/lib/docker/containers/*/*-json.log
pos_file /var/lib/docker/containers/es-containers.log.pos
path /varlog/containers/*.log
pos_file /varlog/es-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S
tag docker.*
tag kubernetes.*
read_from_head true
</source>
<match docker.**>
<match kubernetes.**>
type elasticsearch
log_level info
include_tag_key true

View File

@ -6,7 +6,7 @@
.PHONY: build push
TAG = 1.2
TAG = 1.3
build:
docker build -t gcr.io/google_containers/fluentd-gcp:$(TAG) .

View File

@ -16,14 +16,14 @@
type tail
format none
time_key time
path /var/lib/docker/containers/*/*-json.log
pos_file /var/lib/docker/containers/gcp-containers.log.pos
path /varlog/containers/*/*.log
pos_file /varlog/gcp-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S
tag docker.*
tag kubernetes.*
read_from_head true
</source>
<match docker.**>
<match kubernetes.**>
type google_cloud
flush_interval 5s
# Never wait longer than 5 minutes between retries.

View File

@ -2,7 +2,7 @@ version: v1beta2
id: fluentd-to-elasticsearch
containers:
- name: fluentd-es
image: gcr.io/google_containers/fluentd-elasticsearch:1.3
image: gcr.io/google_containers/fluentd-elasticsearch:1.4
env:
- name: FLUENTD_ARGS
value: -qq

View File

@ -2,7 +2,7 @@ version: v1beta2
id: fluentd-to-gcp
containers:
- name: fluentd-gcp
image: gcr.io/google_containers/fluentd-gcp:1.2
image: gcr.io/google_containers/fluentd-gcp:1.3
volumeMounts:
- name: containers
mountPath: /var/lib/docker/containers

View File

@ -230,14 +230,14 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
testRootDir := makeTempDirOrDie("kubelet_integ_1.", "")
configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil)
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubelet.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil)
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubelet.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil)
return apiServer.URL, configFilePath
}

View File

@ -374,7 +374,8 @@ func SimpleKubelet(client *client.Client,
tlsOptions *kubelet.TLSOptions,
cadvisorInterface cadvisor.Interface,
configFilePath string,
cloud cloudprovider.Interface) *KubeletConfig {
cloud cloudprovider.Interface,
osInterface kubelet.OSInterface) *KubeletConfig {
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: 90,
@ -406,6 +407,7 @@ func SimpleKubelet(client *client.Client,
Cloud: cloud,
NodeStatusUpdateFrequency: 10 * time.Second,
ResourceContainer: "/kubelet",
OSInterface: osInterface,
}
return &kcfg
}
@ -433,6 +435,9 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) {
if builder == nil {
builder = createAndInitKubelet
}
if kcfg.OSInterface == nil {
kcfg.OSInterface = kubelet.RealOS{}
}
k, podCfg, err := builder(kcfg)
if err != nil {
glog.Errorf("Failed to create kubelet: %s", err)
@ -529,6 +534,7 @@ type KubeletConfig struct {
Cloud cloudprovider.Interface
NodeStatusUpdateFrequency time.Duration
ResourceContainer string
OSInterface kubelet.OSInterface
}
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
@ -572,7 +578,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.ImageGCPolicy,
kc.Cloud,
kc.NodeStatusUpdateFrequency,
kc.ResourceContainer)
kc.ResourceContainer,
kc.OSInterface)
if err != nil {
return nil, nil, err

View File

@ -37,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
@ -157,7 +158,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
if err != nil {
glog.Fatalf("Failed to create cAdvisor: %v", err)
}
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil)
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubelet.RealOS{})
kubeletapp.RunKubelet(kcfg, nil)
}

View File

@ -63,6 +63,7 @@ type DockerInterface interface {
RemoveImage(image string) error
Logs(opts docker.LogsOptions) error
Version() (*docker.Env, error)
Info() (*docker.Env, error)
CreateExec(docker.CreateExecOptions) (*docker.Exec, error)
StartExec(string, docker.StartExecOptions) error
}
@ -70,6 +71,7 @@ type DockerInterface interface {
// DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids
type DockerID string
// KubeletContainerName encapsulates a pod name and a Kubernetes container name.
type KubeletContainerName struct {
PodFullName string
PodUID types.UID

View File

@ -44,6 +44,7 @@ type FakeDockerClient struct {
Removed []string
RemovedImages util.StringSet
VersionInfo docker.Env
Information docker.Env
}
func (f *FakeDockerClient) ClearCalls() {
@ -272,6 +273,10 @@ func (f *FakeDockerClient) Version() (*docker.Env, error) {
return &f.VersionInfo, nil
}
func (f *FakeDockerClient) Info() (*docker.Env, error) {
return &f.Information, nil
}
func (f *FakeDockerClient) CreateExec(_ docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{"12345678"}, nil
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -99,6 +99,40 @@ type SourcesReadyFn func() bool
type volumeMap map[string]volume.Volume
// OSInterface collects system level operations that need to be mocked out
// during tests.
type OSInterface interface {
Mkdir(path string, perm os.FileMode) error
Symlink(oldname string, newname string) error
}
// RealOS is used to dispatch the real system level operaitons.
type RealOS struct{}
// MkDir will will call os.Mkdir to create a directory.
func (RealOS) Mkdir(path string, perm os.FileMode) error {
return os.Mkdir(path, perm)
}
// Symlink will call os.Symlink to create a symbolic link.
func (RealOS) Symlink(oldname string, newname string) error {
return os.Symlink(oldname, newname)
}
// FakeOS mocks out certain OS calls to avoid perturbing the filesystem
// on the test machine.
type FakeOS struct{}
// MkDir is a fake call that just returns nil.
func (FakeOS) Mkdir(path string, perm os.FileMode) error {
return nil
}
// Symlink is a fake call that just returns nil.
func (FakeOS) Symlink(oldname string, newname string) error {
return nil
}
// New creates a new Kubelet for use in main
func NewMainKubelet(
hostname string,
@ -123,7 +157,8 @@ func NewMainKubelet(
imageGCPolicy ImageGCPolicy,
cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration,
resourceContainer string) (*Kubelet, error) {
resourceContainer string,
osInterface OSInterface) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -147,6 +182,35 @@ func NewMainKubelet(
if !dockerUp {
return nil, fmt.Errorf("timed out waiting for Docker to come up")
}
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
dockerInfo, err := dockerClient.Info()
if err != nil {
glog.Errorf("Failed to execute Info() call to the Docker client: %v", err)
glog.Warningf("Using fallback default of /var/lib/docker for location of Docker runtime")
} else {
driverStatus := dockerInfo.Get("DriverStatus")
// The DriverStatus is a*string* which represents a list of list of strings (pairs) e.g.
// DriverStatus=[["Root Dir","/var/lib/docker/aufs"],["Backing Filesystem","extfs"],["Dirs","279"]]
// Strip out the square brakcets and quotes.
s := strings.Replace(driverStatus, "[", "", -1)
s = strings.Replace(s, "]", "", -1)
s = strings.Replace(s, `"`, "", -1)
// Separate by commas.
ss := strings.Split(s, ",")
// Search for the Root Dir string
for i, k := range ss {
if k == "Root Dir" && i+1 < len(ss) {
// Discard the /aufs suffix.
dockerRoot, _ = path.Split(ss[i+1])
// Trim the last slash.
dockerRoot = strings.TrimSuffix(dockerRoot, "/")
glog.Infof("Setting dockerRoot to %s", dockerRoot)
}
}
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
@ -229,6 +293,8 @@ func NewMainKubelet(
containerManager: containerManager,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
dockerRoot: dockerRoot,
}
klet.podManager = newBasicPodManager(klet.kubeClient)
@ -256,6 +322,12 @@ func NewMainKubelet(
} else {
klet.networkPlugin = plug
}
// If the /var/log/containers directory does not exist, create it.
if _, err := os.Stat("/var/log/containers"); err != nil {
if err := osInterface.Mkdir("/var/log/containers", 0755); err != nil {
glog.Errorf("Failed to create directory /var/log/containers: %v", err)
}
}
return klet, nil
}
@ -369,6 +441,9 @@ type Kubelet struct {
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
os OSInterface
dockerRoot string
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1040,6 +1115,21 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
return "", err
}
// Create a symbolic link to the Docker container log file using a name which captures the
// full pod name, the container name and the Docker container ID. Cluster level logging will
// capture these symbolic filenames which can be used for search terms in Elasticsearch or for
// labels for Cloud Logging.
// If for any reason kl.dockerRoot is not set, default to /var/lib/docker
dockerRoot := kl.dockerRoot
if kl.dockerRoot == "" {
dockerRoot = "/var/lib/docker"
glog.Errorf("dockerRoot field not set in the Kubelet configuration")
}
containerLogFile := fmt.Sprintf("%s/containers/%s/%s-json.log", dockerRoot, containerID, containerID)
symlinkFile := fmt.Sprintf("/var/log/containers/%s-%s-%s.log", podFullName, container.Name, containerID)
if err = kl.os.Symlink(containerLogFile, symlinkFile); err != nil {
glog.Errorf("Failed to create symbolic link to the log file of pod %q container %q: %v", podFullName, container.Name, err)
}
return containerID, nil
}

View File

@ -75,6 +75,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
kubelet.kubeClient = fakeKubeClient
kubelet.os = FakeOS{}
kubelet.hostname = "testnode"
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))

View File

@ -132,6 +132,14 @@ func (in instrumentedDockerInterface) Version() (*docker.Env, error) {
return in.client.Version()
}
func (in instrumentedDockerInterface) Info() (*docker.Env, error) {
start := time.Now()
defer func() {
DockerOperationsLatency.WithLabelValues("version").Observe(SinceInMicroseconds(start))
}()
return in.client.Info()
}
func (in instrumentedDockerInterface) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
start := time.Now()
defer func() {

View File

@ -86,6 +86,7 @@ func TestRunOnce(t *testing.T) {
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager,
os: FakeOS{},
}
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))