Initial v1.0.0 commit

This commit is contained in:
Jakub Vavřík
2021-01-28 17:37:47 +01:00
commit 1481d27782
4164 changed files with 1264675 additions and 0 deletions

View File

@@ -0,0 +1,305 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var _ inject.Injector = &Controller{}
// Controller implements controller.Controller
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
SetFields func(i interface{}) error
// mu is used to synchronize Controller setup
mu sync.Mutex
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
JitterPeriod time.Duration
// Started is true if the Controller has been Started
Started bool
// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
// Log is used to log messages to users during reconciliation, or for example when a watch is started.
Log logr.Logger
}
// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}
// Reconcile implements reconcile.Reconciler
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
ctx = logf.IntoContext(ctx, log)
return c.Do.Reconcile(ctx, req)
}
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.Log.Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
// Set the internal context.
c.ctx = ctx
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
err := func() error {
defer c.mu.Unlock()
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.Log.Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := syncingSource.WaitForSync(ctx); err != nil {
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
return err
}
}
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}
// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles))
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
c.Log.Info("Stopping workers")
return nil
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
c.reconcileHandler(ctx, obj)
return true
}
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Since(reconcileStartTS))
}()
// Make sure that the the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
ctx = logf.IntoContext(ctx, log)
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(ctx, req); err != nil {
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "error").Inc()
log.Error(err, "Reconciler error")
return
} else if result.RequeueAfter > 0 {
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue_after").Inc()
return
} else if result.Requeue {
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue").Inc()
return
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.Queue.Forget(obj)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "success").Inc()
}
// GetLogger returns this controller's logger.
func (c *Controller) GetLogger() logr.Logger {
return c.Log
}
// InjectFunc implement SetFields.Injector
func (c *Controller) InjectFunc(f inject.Func) error {
c.SetFields = f
return nil
}
// updateMetrics updates prometheus metrics within the controller
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
}

View File

@@ -0,0 +1,77 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
// ReconcileTotal is a prometheus counter metrics which holds the total
// number of reconciliations per controller. It has two labels. controller label refers
// to the controller name and result label refers to the reconcile result i.e
// success, error, requeue, requeue_after
ReconcileTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_reconcile_total",
Help: "Total number of reconciliations per controller",
}, []string{"controller", "result"})
// ReconcileErrors is a prometheus counter metrics which holds the total
// number of errors from the Reconciler
ReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_reconcile_errors_total",
Help: "Total number of reconciliation errors per controller",
}, []string{"controller"})
// ReconcileTime is a prometheus metric which keeps track of the duration
// of reconciliations
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "controller_runtime_reconcile_time_seconds",
Help: "Length of time per reconciliation per controller",
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60},
}, []string{"controller"})
// WorkerCount is a prometheus metric which holds the number of
// concurrent reconciles per controller
WorkerCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "controller_runtime_max_concurrent_reconciles",
Help: "Maximum number of concurrent reconciles per controller",
}, []string{"controller"})
// ActiveWorkers is a prometheus metric which holds the number
// of active workers per controller
ActiveWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "controller_runtime_active_workers",
Help: "Number of currently used workers per controller",
}, []string{"controller"})
)
func init() {
metrics.Registry.MustRegister(
ReconcileTotal,
ReconcileErrors,
ReconcileTime,
WorkerCount,
ActiveWorkers,
// expose process metrics like CPU, Memory, file descriptor usage etc.
prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
// expose Go runtime metrics like GC stats, memory stats etc.
prometheus.NewGoCollector(),
)
}

View File

@@ -0,0 +1,32 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"
)
var (
// RuntimeLog is a base parent logger for use inside controller-runtime.
RuntimeLog logr.Logger
)
func init() {
RuntimeLog = log.Log.WithName("controller-runtime")
}

View File

@@ -0,0 +1,156 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recorder
import (
"context"
"fmt"
"sync"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
)
// EventBroadcasterProducer makes an event broadcaster, returning
// whether or not the broadcaster should be stopped with the Provider,
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
// Provider is a recorder.Provider that records events to the k8s API server
// and to a logr Logger.
type Provider struct {
// scheme to specify when creating a recorder
scheme *runtime.Scheme
// logger is the logger to use when logging diagnostic event info
logger logr.Logger
evtClient typedcorev1.EventInterface
makeBroadcaster EventBroadcasterProducer
broadcasterOnce sync.Once
broadcaster record.EventBroadcaster
stopBroadcaster bool
}
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
// stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
// code finishes up and tries to continue emitting events.
// Stop attempts to stop this provider, stopping the underlying broadcaster
// if the broadcaster asked to be stopped. It kinda tries to honor the given
// context, but the underlying broadcaster has an indefinite wait that doesn't
// return until all queued events are flushed, so this may end up just returning
// before the underlying wait has finished instead of cancelling the wait.
// This is Very Frustrating™.
func (p *Provider) Stop(shutdownCtx context.Context) {
doneCh := make(chan struct{})
go func() {
// technically, this could start the broadcaster, but practically, it's
// almost certainly already been started (e.g. by leader election). We
// need to invoke this to ensure that we don't inadvertently race with
// an invocation of getBroadcaster.
broadcaster := p.getBroadcaster()
if p.stopBroadcaster {
broadcaster.Shutdown()
}
close(doneCh)
}()
select {
case <-shutdownCtx.Done():
case <-doneCh:
}
}
// getBroadcaster ensures that a broadcaster is started for this
// provider, and returns it. It's threadsafe.
func (p *Provider) getBroadcaster() record.EventBroadcaster {
// NB(directxman12): this can technically still leak if something calls
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
// create the broadcaster in start, we could race with other things that
// are started at the same time & want to emit events. The alternative is
// silently swallowing events and more locking, but that seems suboptimal.
p.broadcasterOnce.Do(func() {
broadcaster, stop := p.makeBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.evtClient})
broadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
})
p.broadcaster = broadcaster
p.stopBroadcaster = stop
})
return p.broadcaster
}
// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to init clientSet: %w", err)
}
p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: clientSet.CoreV1().Events("")}
return p, nil
}
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
return &lazyRecorder{
prov: p,
name: name,
}
}
// lazyRecorder is a recorder that doesn't actually instantiate any underlying
// recorder until the first event is emitted.
type lazyRecorder struct {
prov *Provider
name string
recOnce sync.Once
rec record.EventRecorder
}
// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *lazyRecorder) ensureRecording() {
l.recOnce.Do(func() {
broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
})
}
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
l.ensureRecording()
l.rec.Event(object, eventtype, reason, message)
}
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
}
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
}