In my previous post Writing Controllers for Kubernetes Custom Resources I explored the development process using pure client-go and how it differs from using controller-runtime (and kubebuilder). In this post, I explore how controller-runtime (v0.7.0) uses concepts we know from client-go.
As described in my previous post, to develop an operator with controller-runtime, we need to
- define CRD types
- generate deepcopy functions
- write reconciler logic
- create and run the controller
- perform deployment-related steps (out of scope for this post)
I will skip the first two steps as they are similar to development with pure client-go and start with the reconciler logic. Here, controller-runtime needs us to implement the Reconciler
interface:
/* source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/reconcile/reconcile.go */
type Reconciler interface {
Reconcile(context.Context, Request) (Result, error)
}
package controllers
import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type MyKindReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *MyKindReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
/* snipped reconciler logic */
return ctrl.Result{}, nil
}
After this, we have to create and run the controller. And that’s where a new layer of abstraction called the manager
comes in.
The Manager
A manager is required to create and start up a controller (there can be multiple controllers associated with a manager). Starting the manager also starts all controllers (and other runnables like webhook servers) assigned to it.
package main
import (
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
mygroupv1 "myoperator/api/v1"
"myoperator/controllers"
)
var (
scheme = runtime.NewScheme()
)
func init() {
_ = clientgoscheme.AddToScheme(scheme)
_ = mygroupv1.AddToScheme(scheme)
}
func main() {
// create manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
})
// create and add controller
r := &controllers.MyKindReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
_ = ctrl.NewControllerManagedBy(mgr).
For(&mygroupv1.MyKind{}).
Complete(r)
// start manager
mgr.Start(ctrl.SetupSignalHandler())
It’s just a few lines of code, but there’s a lot of magic happening behind the scenes. For example, we might ask ourselves:
- with pure client-go, we generated informers to keep us updated about resources - where are they now?
- with pure client-go, we read from an internal cache (lister) and write to the k8s apiserver (clientset) when we reconcile - how does that work now?
The first question is answered by the manager’s cache
component, the second by its client
component. Let’s look at them next.
The Manager’s Components
A manager’s client
component is responsible for read and write operations in general. The cache
actually has two functions: 1) it manages informers and therefore keeps our controller in the loop regarding resource updates and 2) it caches data in a local index to reduce load on the k8s apiserver. The client component reuses the cache component for cached read operations.
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/manager/internal.go */
import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type controllerManager struct {
cache cache.Cache
client client.Client
/* ... */
}
You can skip the next two sections and proceed with And finally, the Controller in case you are only interested in the general idea and not the technical details (= code snippets I found helpful when browsing through the controller-runtime code).
The Cache
A cache is a struct that implements the composite Cache
interface
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/cache/cache.go */
type Cache interface {
client.Reader
Informers
}
type Informers interface {
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
Start(ctx context.Context) error
WaitForCacheSync(ctx context.Context) bool
client.FieldIndexer
}
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/client/interfaces.go */
type Reader interface {
Get(ctx context.Context, key ObjectKey, obj Object) error
List(ctx context.Context, list ObjectList, opts ...ListOption) error
}
type FieldIndexer interface {
IndexField(ctx context.Context, obj Object, field string, extractValue IndexerFunc) error
}
The definition of that struct is spread across several files (pkg/cache/informer_cache.go
, pkg/cache/internal/deleg_map.go
, pkg/cache/internal/informers_map.go
, pkg/cache/internal/cache_reader.go
). If we strip away the details like support for (un)structured types or multiple resources, the struct is more or less a map that maps from objects or GVKs to client-go’s SharedIndexInformer
. The following table shows how client-go’s package cache
is used to implement controller-runtime’s manager cache:
Function |
Description |
client-go objects (pkg cache) |
Get |
cached read access |
reads from the informer’s index SharedIndexInformer.GetIndexer() |
List |
cached read access |
same as above |
GetInformer |
retrieves informer for a given runtime object (creates one if doesn’t exist) |
returns SharedIndexInformer |
GetInformerForKind |
same as above, but for GKV |
same as above |
Start |
runs all informers, meaning it will list & watch the k8s apiserver for resource updates |
ListWatch |
WaitForCacheSync |
waits for all caches to sync |
WaitForCacheSync |
IndexField |
adds field indices over the cache |
SharedIndexInformer.AddIndexers |
(Note: The functions GetInformer
and GetInformerForKind
return an informer that offers functions like AddEventHandler
, AddEventHandlerWithResyncPeriod
, AddIndexers
and HasSynced
. In case you remember, AddEventHandler
was the function we used in pure client-go to register our workqueue for resource updates. With controller-runtime, this is taken care of for us.)
The Client
A client is a struct that implements the composite Client
interface:
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/client/interfaces.go */
type Client interface {
Reader
Writer
StatusClient
Scheme() *runtime.Scheme
RESTMapper() meta.RESTMapper
}
type Reader interface {
Get(ctx context.Context, key ObjectKey, obj Object) error
List(ctx context.Context, list ObjectList, opts ...ListOption) error
}
type Writer interface {
Create(ctx context.Context, obj Object, opts ...CreateOption) error
Delete(ctx context.Context, obj Object, opts ...DeleteOption) error
Update(ctx context.Context, obj Object, opts ...UpdateOption) error
Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error
DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error
}
type StatusClient interface {
Status() StatusWriter
}
(Note: StatusClient
is in charge of updating the status subresource)
The special thing about the manager’s client is that it will access the k8s apiserver for write operations and the cache for read operations. The client is what we use in our reconciler logic to handle resources. Instead of using listers and clientsets separately, we now have a unified interface. If the we want to bypass the cache for read operations, we can use the ClientDisableCacheFor
option of the manager.
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/manager/client_builder.go */
import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
UncachedObjects: n.uncached,
})
}
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/client/split.go */
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
/* ... */
return &delegatingClient{
scheme: in.Client.Scheme(),
mapper: in.Client.RESTMapper(),
Reader: &delegatingReader{
CacheReader: in.CacheReader,
ClientReader: in.Client,
scheme: in.Client.Scheme(),
uncachedGVKs: uncachedGVKs,
},
Writer: in.Client,
StatusClient: in.Client,
}, nil
}
And finally, the Controller
It’s time to put everything together. Remember the magical lines for controller creation?
func main() {
/* ... */
// create and add controller
r := &controllers.MyKindReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
/* ... */
}
Here we reference the manager’s client so that we can use it for read/write operations in our reconciler logic.
func main(){
/* ... */
_ = ctrl.NewControllerManagedBy(mgr).
For(&mygroupv1.MyKind{}).
Complete(r)
// start manager
mgr.Start(ctrl.SetupSignalHandler())
}
This is where most of the magic happens. When we tell controller-runtime to build our controller with NewControllerManagedBy(...).For(...).Complete(...)
, it creates a controller struct with references to our reconciler implementation and a new workqueue for the controller.
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/controller/controller.go */
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}
return c, mgr.Add(c)
}
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
}, nil
}
When the controller is added to the manager, the manager’s cache is injected into the controller. The controller needs the manager’s cache because of the resource updates provided by the informers in it.
When the manager is started, the control loop is started as well. This will create and run informers for our resources, and the controller workqueue is registered to the informers. Finally, worker goroutines are launched, each of which will process a workqueue item by calling our custom reconciler logic inside processNextWorkItem
.
/* snipped source code from https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.7/pkg/internal/controller/controller.go */
func (c *Controller) Start(ctx context.Context) error {
// create workqueue
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown()
err := func() error {
// run informers, registers workqueue for resource updates
for _, watch := range c.startWatches {
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// wait until informers are synced
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
}
// launch workers to process resources
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
// calls custom reconciler logic
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
// stop workers
return nil
In a nutshell
And finally, here is a picture to summarize the relationship between our code, controller-runtime and client-go:
I hope that this post helped make some things a bit clearer - if you have any feedback, I’m happy to hear them!