Update for c/image's update of github.com/gobuffalo/pop

> go get github.com/containers/image/v5@main
> go mod tidy -go=1.16 && go mod tidy -go=1.17
> make vendor

The (go mod tidy) pair is necessary to keep c/image CI working.

Signed-off-by: Miloslav Trmač <mitr@redhat.com>
This commit is contained in:
Miloslav Trmač
2022-08-23 20:47:59 +02:00
parent a81460437a
commit 4b9ffac0cc
151 changed files with 4206 additions and 6092 deletions

View File

@@ -73,6 +73,12 @@ func init() {
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
internal.AddExtraServerOptions = func(opt ...ServerOption) {
extraServerOptions = opt
}
internal.ClearExtraServerOptions = func() {
extraServerOptions = nil
}
}
var statusOK = status.New(codes.OK, "")
@@ -150,7 +156,7 @@ type serverOptions struct {
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
@@ -174,6 +180,7 @@ var defaultServerOptions = serverOptions{
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
var extraServerOptions []ServerOption
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
@@ -435,7 +442,7 @@ func InTapHandle(h tap.ServerInHandle) ServerOption {
// StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.statsHandler = h
o.statsHandlers = append(o.statsHandlers, h)
})
}
@@ -560,6 +567,9 @@ func (s *Server) stopServerWorkers() {
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range extraServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}
@@ -867,7 +877,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
ConnectionTimeout: s.opts.connectionTimeout,
Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
StatsHandlers: s.opts.statsHandlers,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
@@ -963,7 +973,7 @@ var _ http.Handler = (*Server)(nil)
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -1076,8 +1086,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
}
err = t.Write(stream, hdr, payload, opts)
if err == nil && s.opts.statsHandler != nil {
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
if err == nil {
for _, sh := range s.opts.statsHandlers {
sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
}
}
return err
}
@@ -1124,13 +1136,13 @@ func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerIn
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
sh := s.opts.statsHandler
if sh != nil || trInfo != nil || channelz.IsOn() {
shs := s.opts.statsHandlers
if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
if channelz.IsOn() {
s.incrCallsStarted()
}
var statsBegin *stats.Begin
if sh != nil {
for _, sh := range shs {
beginTime := time.Now()
statsBegin = &stats.Begin{
BeginTime: beginTime,
@@ -1161,7 +1173,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
trInfo.tr.Finish()
}
if sh != nil {
for _, sh := range shs {
end := &stats.End{
BeginTime: statsBegin.BeginTime,
EndTime: time.Now(),
@@ -1243,7 +1255,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
var payInfo *payloadInfo
if sh != nil || binlog != nil {
if len(shs) != 0 || binlog != nil {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
@@ -1260,7 +1272,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if sh != nil {
for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
@@ -1418,16 +1430,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if channelz.IsOn() {
s.incrCallsStarted()
}
sh := s.opts.statsHandler
shs := s.opts.statsHandlers
var statsBegin *stats.Begin
if sh != nil {
if len(shs) != 0 {
beginTime := time.Now()
statsBegin = &stats.Begin{
BeginTime: beginTime,
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
sh.HandleRPC(stream.Context(), statsBegin)
for _, sh := range shs {
sh.HandleRPC(stream.Context(), statsBegin)
}
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ss := &serverStream{
@@ -1439,10 +1453,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: sh,
statsHandler: shs,
}
if sh != nil || trInfo != nil || channelz.IsOn() {
if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
// See comment in processUnaryRPC on defers.
defer func() {
if trInfo != nil {
@@ -1456,7 +1470,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock()
}
if sh != nil {
if len(shs) != 0 {
end := &stats.End{
BeginTime: statsBegin.BeginTime,
EndTime: time.Now(),
@@ -1464,7 +1478,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
for _, sh := range shs {
sh.HandleRPC(stream.Context(), end)
}
}
if channelz.IsOn() {