Merge pull request #15735 from mesosphere/jdef-fix-tasks-and-profiling

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-17 02:20:18 -07:00
commit 1bba475a9d
22 changed files with 34700 additions and 3181 deletions

32
Godeps/Godeps.json generated
View File

@ -418,43 +418,43 @@
},
{
"ImportPath": "github.com/mesos/mesos-go/auth",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/detector",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/executor",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosproto",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosutil",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/messenger",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/scheduler",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/mesos/mesos-go/upid",
"Comment": "mesos-0.24.0-20-gb164c06",
"Rev": "b164c06f346af1e93aecb6502f83d31dbacdbb91"
"Comment": "v0.0.2-5-ged907b1",
"Rev": "ed907b10717e66325cf2894eb90a0553a89fcb11"
},
{
"ImportPath": "github.com/miekg/dns",

File diff suppressed because it is too large Load Diff

View File

@ -18,8 +18,19 @@
package mesosproto;
import "mesos.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
message AuthenticateMessage {
required string pid = 1; // PID that needs to be authenticated.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mesosproto;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
/**
* ACLs used for local authorization (See authorization.md file in the
* docs).
*/
message ACL {
// Entity is used to describe a subject(s) or an object(s) of an ACL.
// NOTE:
// To allow everyone access to an Entity set its type to 'ANY'.
// To deny access to an Entity set its type to 'NONE'.
message Entity {
enum Type {
SOME = 0;
ANY = 1;
NONE = 2;
}
optional Type type = 1 [default = SOME];
repeated string values = 2; // Ignored for ANY/NONE.
}
// ACLs.
message RegisterFramework {
// Subjects.
required Entity principals = 1; // Framework principals.
// Objects.
required Entity roles = 2; // Roles for resource offers.
}
message RunTask {
// Subjects.
required Entity principals = 1; // Framework principals.
// Objects.
required Entity users = 2; // Users to run the tasks/executors as.
}
// Which principals are authorized to shutdown frameworks of other
// principals.
message ShutdownFramework {
// Subjects.
required Entity principals = 1;
// Objects.
required Entity framework_principals = 2;
}
}
/**
* Collection of ACL.
*
* Each authorization request is evaluated against the ACLs in the order
* they are defined.
*
* For simplicity, the ACLs for a given action are not aggregated even
* when they have the same subjects or objects. The first ACL that
* matches the request determines whether that request should be
* permitted or not. An ACL matches iff both the subjects
* (e.g., clients, principals) and the objects (e.g., urls, users,
* roles) of the ACL match the request.
*
* If none of the ACLs match the request, the 'permissive' field
* determines whether the request should be permitted or not.
*
* TODO(vinod): Do aggregation of ACLs when possible.
*
*/
message ACLs {
optional bool permissive = 1 [default = true];
repeated ACL.RegisterFramework register_frameworks = 2;
repeated ACL.RunTask run_tasks = 3;
repeated ACL.ShutdownFramework shutdown_frameworks = 4;
}

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,17 @@ package mesosproto;
import "mesos.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
/**
* Encodes the launch command sent to the external containerizer

File diff suppressed because it is too large Load Diff

View File

@ -32,7 +32,6 @@ option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
/**
* Status is used to indicate the state of the scheduler and executor
* driver after function calls.
@ -104,6 +103,32 @@ message ContainerID {
}
/**
* A network address.
*
* TODO(bmahler): Use this more widely.
*/
message Address {
// May contain a hostname, IP address, or both.
optional string hostname = 1;
optional string ip = 2;
required int32 port = 3;
}
/**
* Represents a URL.
*/
message URL {
required string scheme = 1;
required Address address = 2;
optional string path = 3;
repeated Parameter query = 4;
optional string fragment = 5;
}
/**
* Describes a framework.
*/
@ -141,7 +166,8 @@ message FrameworkInfo {
// Used to indicate the current host from which the scheduler is
// registered in the Mesos Web UI. If set to an empty string Mesos
// will automagically set it to the current hostname.
// will automagically set it to the current hostname if one is
// available.
optional string hostname = 7;
// This field should match the credential's principal the framework
@ -170,6 +196,11 @@ message FrameworkInfo {
// capabilities (e.g., ability to receive offers for revocable
// resources).
repeated Capability capabilities = 10;
// Labels are free-form key value pairs supplied by the framework
// scheduler (e.g., to describe additional functionality offered by
// the framework). These labels are not interpreted by Mesos itself.
optional Labels labels = 11;
}
@ -353,11 +384,35 @@ message ExecutorInfo {
*/
message MasterInfo {
required string id = 1;
// The IP address (only IPv4) as a packed 4-bytes integer,
// stored in network order. Deprecated, use `address.ip` instead.
required uint32 ip = 2;
// The TCP port the Master is listening on for incoming
// HTTP requests; deprecated, use `address.port` instead.
required uint32 port = 3 [default = 5050];
// In the default implementation, this will contain information
// about both the IP address, port and Master name; it should really
// not be relied upon by external tooling/frameworks and be
// considered an "internal" implementation field.
optional string pid = 4;
// The server's hostname, if available; it may be unreliable
// in environments where the DNS configuration does not resolve
// internal hostnames (eg, some public cloud providers).
// Deprecated, use `address.hostname` instead.
optional string hostname = 5;
// The running Master version, as a string; taken from the
// generated "master/version.hpp".
optional string version = 6;
// The full IP address (supports both IPv4 and IPv6 formats)
// and supersedes the use of `ip`, `port` and `hostname`.
// Since Mesos 0.24.
optional Address address = 7;
}
@ -618,6 +673,7 @@ message ResourceStatistics {
optional uint64 mem_mapped_file_bytes = 12;
// This is only set if swap is enabled.
optional uint64 mem_swap_bytes = 40;
optional uint64 mem_unevictable_bytes = 41;
// Number of occurrences of different levels of memory pressure
// events reported by memory cgroup. Pressure listening (re)starts
@ -680,7 +736,9 @@ message ResourceUsage {
repeated Executor executors = 1;
// TODO(jieyu): Include slave's total resources here.
// Slave's total resources including checkpointed dynamic
// reservations and persistent volumes.
repeated Resource total = 2;
}
@ -764,8 +822,6 @@ message PerfStatistics {
* to proactively influence the allocator. If 'slave_id' is provided
* then this request is assumed to only apply to resources on that
* slave.
*
* TODO(vinod): Remove this once the old driver is removed.
*/
message Request {
optional SlaveID slave_id = 1;
@ -782,6 +838,10 @@ message Offer {
required FrameworkID framework_id = 2;
required SlaveID slave_id = 3;
required string hostname = 4;
// URL for reaching the slave running on the host.
optional URL url = 8;
repeated Resource resources = 5;
repeated Attribute attributes = 7;
repeated ExecutorID executor_ids = 6;
@ -944,6 +1004,14 @@ message TaskStatus {
// (true) or unhealthy (false) according to the HealthCheck field in
// the command info.
optional bool healthy = 8;
// Labels are free-form key value pairs which are exposed through
// master and slave endpoints. Labels will not be interpreted or
// acted upon by Mesos itself. As opposed to the data field, labels
// will be kept in memory on master and slave processes. Therefore,
// labels should be used to tag TaskStatus message with light-weight
// meta-data.
optional Labels labels = 12;
}
@ -1019,81 +1087,6 @@ message Credentials {
}
/**
* ACLs used for authorization.
*/
message ACL {
// Entity is used to describe a subject(s) or an object(s) of an ACL.
// NOTE:
// To allow everyone access to an Entity set its type to 'ANY'.
// To deny access to an Entity set its type to 'NONE'.
message Entity {
enum Type {
SOME = 0;
ANY = 1;
NONE = 2;
}
optional Type type = 1 [default = SOME];
repeated string values = 2; // Ignored for ANY/NONE.
}
// ACLs.
message RegisterFramework {
// Subjects.
required Entity principals = 1; // Framework principals.
// Objects.
required Entity roles = 2; // Roles for resource offers.
}
message RunTask {
// Subjects.
required Entity principals = 1; // Framework principals.
// Objects.
required Entity users = 2; // Users to run the tasks/executors as.
}
// Which principals are authorized to shutdown frameworks of other
// principals.
message ShutdownFramework {
// Subjects.
required Entity principals = 1;
// Objects.
required Entity framework_principals = 2;
}
}
/**
* Collection of ACL.
*
* Each authorization request is evaluated against the ACLs in the order
* they are defined.
*
* For simplicity, the ACLs for a given action are not aggregated even
* when they have the same subjects or objects. The first ACL that
* matches the request determines whether that request should be
* permitted or not. An ACL matches iff both the subjects
* (e.g., clients, principals) and the objects (e.g., urls, users,
* roles) of the ACL match the request.
*
* If none of the ACLs match the request, the 'permissive' field
* determines whether the request should be permitted or not.
*
* TODO(vinod): Do aggregation of ACLs when possible.
*
*/
message ACLs {
optional bool permissive = 1 [default = true];
repeated ACL.RegisterFramework register_frameworks = 2;
repeated ACL.RunTask run_tasks = 3;
repeated ACL.ShutdownFramework shutdown_frameworks = 4;
}
/**
* Rate (queries per second, QPS) limit for messages from a framework to master.
* Strictly speaking they are the combined rate from all frameworks of the same
@ -1138,24 +1131,76 @@ message RateLimits {
}
/**
* Describe an image used by tasks or executors. Note that it's only
* for tasks or executors launched by MesosContainerizer currently.
* TODO(jieyu): This feature not fully supported in 0.24.0. Please do
* not use it until this feature is announced.
*/
message Image {
enum Type {
APPC = 1;
DOCKER = 2;
}
// Protobuf for specifying an Appc container image. See:
// https://github.com/appc/spec/blob/master/spec/aci.md
message AppC {
// The name of the image.
required string name = 1;
// An image ID is a string of the format "hash-value", where
// "hash" is the hash algorithm used and "value" is the hex
// encoded string of the digest. Currently the only permitted
// hash algorithm is sha512.
optional string id = 2;
// Optional labels. Suggested labels: "version", "os", and "arch".
optional Labels labels = 3;
}
message Docker {
// The name of the image. Expected in format repository[:tag].
required string name = 1;
}
required Type type = 1;
// Only one of the following image messages should be set to match
// the type.
optional AppC appc = 2;
optional Docker docker = 3;
}
/**
* Describes a volume mapping either from host to container or vice
* versa. Both paths can either refer to a directory or a file.
*/
message Volume {
// Absolute path pointing to a directory or file in the container.
required string container_path = 1;
// Absolute path pointing to a directory or file on the host or a path
// relative to the container work directory.
optional string host_path = 2;
enum Mode {
RW = 1; // read-write.
RO = 2; // read-only.
}
required Mode mode = 3;
// Path pointing to a directory or file in the container. If the
// path is a relative path, it is relative to the container work
// directory. If the path is an absolute path, that path must
// already exist.
required string container_path = 1;
// The following specifies the source of this volume. At most one of
// the following should be set.
// Absolute path pointing to a directory or file on the host or a
// path relative to the container work directory.
optional string host_path = 2;
// The source of the volume is an Image which describes a root
// filesystem which will be provisioned by Mesos.
optional Image image = 4;
}
@ -1206,11 +1251,18 @@ message ContainerInfo {
optional bool force_pull_image = 6;
}
message MesosInfo {
optional Image image = 1;
}
required Type type = 1;
repeated Volume volumes = 2;
optional string hostname = 4;
// Only one of the following *Info messages should be set to match
// the type.
optional DockerInfo docker = 3;
optional MesosInfo mesos = 5;
}
@ -1277,3 +1329,36 @@ message DiscoveryInfo {
optional Ports ports = 6;
optional Labels labels = 7;
}
/**
* Protobuf for the Appc image manifest JSON schema:
* https://github.com/appc/spec/blob/master/spec/aci.md#image-manifest-schema
* Where possible, any field required in the schema is required in the protobuf
* but some cannot be expressed, e.g., a repeated string that has at least one
* element. Further validation should be performed after parsing the JSON into
* the protobuf.
* This version of Appc protobuf is based on Appc spec version 0.6.1.
* TODO(xujyan): This protobuf currently defines a subset of fields in the spec
* that Mesos makes use of to avoid confusion. New fields are going to be added
* when Mesos starts to support them.
*/
message AppcImageManifest {
required string acKind = 1;
required string acVersion = 2;
required string name = 3;
message Label {
required string name = 1;
required string value = 2;
}
repeated Label labels = 4;
message Annotation {
required string name = 1;
required string value = 2;
}
repeated Annotation annotations = 5;
}

File diff suppressed because it is too large Load Diff

View File

@ -21,10 +21,22 @@ package mesosproto;
import "mesos.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
// TODO(benh): Provide comments for each of these messages. Also,
// consider splitting these messages into different "packages" which
// represent which messages get handled by which components (e.g., the
// "mesos.internal.executor" package includes messages that the
// "mesos.executor" package includes messages that the
// executor handles).
@ -194,8 +206,15 @@ message RunTaskMessage {
// TODO(karya): Remove framework_id after MESOS-2559 has shipped.
optional FrameworkID framework_id = 1 [deprecated = true];
required FrameworkInfo framework = 2;
required string pid = 3;
required TaskInfo task = 4;
// The pid of the framework. This was moved to 'optional' in
// 0.24.0 to support schedulers using the HTTP API. For now, we
// continue to always set pid since it was required in 0.23.x.
// When 'pid' is unset, or set to empty string, the slave will
// forward executor messages through the master. For schedulers
// still using the driver, this will remain set.
optional string pid = 3;
}
@ -336,7 +355,9 @@ message ShutdownExecutorMessage {
message UpdateFrameworkMessage {
required FrameworkID framework_id = 1;
required string pid = 2;
// See the comment on RunTaskMessage.pid.
optional string pid = 2;
}

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,18 @@ package mesosproto;
import "mesos.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
message Registry {
message Master {
required MasterInfo info = 1;

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,17 @@ package mesosproto;
import "mesos.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
/**
* Scheduler event API.
@ -40,11 +51,26 @@ message Event {
MESSAGE = 5; // See 'Message' below.
FAILURE = 6; // See 'Failure' below.
ERROR = 7; // See 'Error' below.
// Periodic message sent by the Mesos master according to
// 'Subscribed.heartbeat_interval_seconds'. If the scheduler does
// not receive any events (including heartbeats) for an extended
// period of time (e.g., 5 x heartbeat_interval_seconds), there is
// likely a network partition. In such a case the scheduler should
// close the existing subscription connection and resubscribe
// using a backoff strategy.
HEARTBEAT = 8;
}
// First event received when the scheduler subscribes.
message Subscribed {
required FrameworkID framework_id = 1;
// This value will be set if the master is sending heartbeats. See
// the comment above on 'HEARTBEAT' for more details.
// TODO(vinod): Implement heartbeats in the master once the master
// can send HTTP events.
optional double heartbeat_interval_seconds = 2;
}
// Received whenever there are new resources that are offered to the
@ -152,6 +178,7 @@ message Call {
ACKNOWLEDGE = 8; // See 'Acknowledge' below.
RECONCILE = 9; // See 'Reconcile' below.
MESSAGE = 10; // See 'Message' below.
REQUEST = 11; // See 'Request' below.
// TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
// already subscribed frameworks as a way of stopping offers from
@ -282,6 +309,16 @@ message Call {
required bytes data = 3;
}
// Requests a specific set of resources from Mesos's allocator. If
// the allocator has support for this, corresponding offers will be
// sent asynchronously via the OFFERS event(s).
//
// NOTE: The built-in hierarchical allocator doesn't have support
// for this call and hence simply ignores it.
message Request {
repeated mesosproto.Request requests = 1;
}
// Identifies who generated this call. Master assigns a framework id
// when a new scheduler subscribes for the first time. Once assigned,
// the scheduler must set the 'framework_id' here and within its
@ -302,4 +339,5 @@ message Call {
optional Acknowledge acknowledge = 8;
optional Reconcile reconcile = 9;
optional Message message = 10;
optional Request request = 11;
}

View File

@ -32,7 +32,6 @@ option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
// Describes a state entry, a versioned (via a UUID) key/value pair.
message Entry {
required string name = 1;

View File

@ -22,6 +22,12 @@ import (
const (
DefaultReadTimeout = 5 * time.Second
DefaultWriteTimeout = 5 * time.Second
// writeFlushPeriod is the amount of time we're willing to wait for a single
// response buffer to be fully written to the underlying TCP connection; after
// this amount of time the remaining bytes of the response are discarded. see
// responseWriter().
writeFlushPeriod = 30 * time.Second
)
type decoderID int32
@ -75,9 +81,9 @@ type httpDecoder struct {
cancelGuard sync.Mutex
readTimeout time.Duration
writeTimeout time.Duration
idtag string // useful for debugging
sendError func(err error) // abstraction for error handling
outCh chan *bytes.Buffer
idtag string // useful for debugging
sendError func(err error) // abstraction for error handling
outCh chan *bytes.Buffer // chan of responses to be written to the connection
}
// DecodeHTTP hijacks an HTTP server connection and generates mesos libprocess HTTP
@ -135,27 +141,13 @@ func (d *httpDecoder) Cancel(graceful bool) {
func (d *httpDecoder) run(res http.ResponseWriter) {
defer func() {
close(d.outCh)
close(d.outCh) // we're finished generating response objects
log.V(2).Infoln(d.idtag + "run: terminating")
}()
go func() {
for buf := range d.outCh {
select {
case <-d.forceQuit:
return
default:
}
//TODO(jdef) I worry about this busy-looping
for buf.Len() > 0 {
d.tryFlushResponse(buf)
}
}
}()
var next httpState
for state := d.bootstrapState(res); state != nil; state = next {
next = state(d)
for state := d.bootstrapState(res); state != nil; {
next := state(d)
state = next
}
}
@ -358,6 +350,7 @@ func limit(r *bufio.Reader, limit int64) *io.LimitedReader {
// is ready to be hijacked at this point.
func (d *httpDecoder) bootstrapState(res http.ResponseWriter) httpState {
log.V(2).Infoln(d.idtag + "bootstrap-state")
d.updateForRequest()
// hijack
@ -373,11 +366,44 @@ func (d *httpDecoder) bootstrapState(res http.ResponseWriter) httpState {
d.sendError(errHijackFailed)
return terminateState
}
d.rw = rw
d.con = c
go d.responseWriter()
return d.readBodyContent()
}
func (d *httpDecoder) responseWriter() {
defer func() {
log.V(3).Infoln(d.idtag + "response-writer: closing connection")
d.con.Close()
}()
for buf := range d.outCh {
//TODO(jdef) I worry about this busy-looping
// write & flush the buffer until there's nothing left in it, or else
// we exceed the write/flush period.
now := time.Now()
for buf.Len() > 0 && time.Since(now) < writeFlushPeriod {
select {
case <-d.forceQuit:
return
default:
}
d.tryFlushResponse(buf)
}
if buf.Len() > 0 {
//TODO(jdef) should we abort the entire connection instead? a partially written
// response doesn't do anyone any good. That said, real libprocess agents don't
// really care about the response channel anyway - the entire system is fire and
// forget. So I've decided to err on the side that we might lose response bytes
// in favor of completely reading the connection request stream before we terminate.
log.Errorln(d.idtag + "failed to fully flush output buffer within write-flush period")
}
}
}
type body struct {
*bytes.Buffer
}

View File

@ -44,6 +44,18 @@ var (
errNotStarted = errors.New("HTTP transport has not been started")
errTerminal = errors.New("HTTP transport is terminated")
errAlreadyRunning = errors.New("HTTP transport is already running")
httpTransport, httpClient = &http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
},
&http.Client{
Transport: httpTransport,
Timeout: DefaultReadTimeout,
}
)
// httpTransporter is a subset of the Transporter interface
@ -114,7 +126,7 @@ type HTTPTransporter struct {
listener net.Listener // TODO(yifan): Change to TCPListener.
mux *http.ServeMux
tr *http.Transport
client *http.Client // TODO(yifan): Set read/write deadline.
client *http.Client
messageQueue chan *Message
address net.IP // optional binding address
shouldQuit chan struct{}
@ -124,13 +136,12 @@ type HTTPTransporter struct {
// NewHTTPTransporter creates a new http transporter with an optional binding address.
func NewHTTPTransporter(upid upid.UPID, address net.IP) *HTTPTransporter {
tr := &http.Transport{}
result := &HTTPTransporter{
upid: upid,
messageQueue: make(chan *Message, defaultQueueSize),
mux: http.NewServeMux(),
client: &http.Client{Transport: tr},
tr: tr,
client: httpClient,
tr: httpTransport,
address: address,
shouldQuit: make(chan struct{}),
}

View File

@ -30,6 +30,7 @@ mesosslave:
hostname: mesosslave
privileged: true
image: mesosphere/mesos-slave-dind:mesos-0.24.0_dind-0.2_docker-1.8.2_ubuntu-14.04.3
ports: [ "10248","10249" ]
entrypoint:
- bash
- -xc
@ -147,6 +148,8 @@ scheduler:
--cluster-domain=cluster.local
--mesos-executor-cpus=1.0
--v=4
--executor-logv=4
--profiling=true
environment:
- MESOS_DOCKER_ETCD_TIMEOUT
- MESOS_DOCKER_MESOS_TIMEOUT

View File

@ -19,6 +19,7 @@ package tasks
import (
"fmt"
"io"
"io/ioutil"
"os/exec"
"sync"
"sync/atomic"
@ -218,10 +219,15 @@ func notStartedTask(t *Task) taskStateFn {
// create command
cmd := exec.Command(t.bin, t.args...)
if _, err := cmd.StdoutPipe(); err != nil {
stdout, err := cmd.StdoutPipe()
if err != nil {
t.tryError(fmt.Errorf("error getting stdout of %v: %v", t.name, err))
return taskShouldRestart
}
go func() {
defer stdout.Close()
io.Copy(ioutil.Discard, stdout) // TODO(jdef) we might want to save this at some point
}()
stderrLogs, err := cmd.StderrPipe()
if err != nil {
t.tryError(fmt.Errorf("error getting stderr of %v: %v", t.name, err))

View File

@ -392,6 +392,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--contain-pod-resources=%t", s.ContainPodResources))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--enable-debugging-handlers=%t", s.EnableProfiling))
if s.AuthPath != "" {
//TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file

View File

@ -32,6 +32,7 @@ import (
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/util"
)
const defaultClusterName = "mesos"
@ -102,7 +103,7 @@ func newMesosClient(
md detector.Master,
mesosHttpClientTimeout, stateCacheTTL time.Duration) (*mesosClient, error) {
tr := &http.Transport{}
tr := util.SetTransportDefaults(&http.Transport{})
httpClient := &http.Client{
Transport: tr,
Timeout: mesosHttpClientTimeout,