Kubernetes Operators: Building Custom Controllers

August 3, 2020

Kubernetes Operators have become the standard pattern for managing complex applications. They encode operational knowledge into software, automating tasks that would otherwise require manual intervention.

Here’s how to understand and build effective Kubernetes operators.

What Operators Are

The Operator Pattern

Traditional automation:
  Manual runbook → Human executes → Application managed

Operator pattern:
  Custom Resource → Controller watches → Controller reconciles → Application managed

Operators extend Kubernetes to manage applications the same way Kubernetes manages pods—declaratively and continuously.

Control Loop

// The reconciliation loop
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 1. Fetch the desired state (Custom Resource)
    var app MyApp
    if err := r.Get(ctx, req.NamespacedName, &app); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 2. Get the current state
    current := getCurrentState(ctx, r, app)

    // 3. Compare and reconcile
    if !reflect.DeepEqual(app.Spec, current) {
        if err := reconcile(ctx, r, app); err != nil {
            return ctrl.Result{}, err
        }
    }

    // 4. Update status
    app.Status.Ready = isReady(ctx, r, app)
    if err := r.Status().Update(ctx, &app); err != nil {
        return ctrl.Result{}, err
    }

    // 5. Requeue if needed
    return ctrl.Result{RequeueAfter: time.Minute}, nil
}

Custom Resource Definition (CRD)

Define your API:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.example.com
spec:
  group: example.com
  names:
    kind: Database
    plural: databases
    singular: database
    shortNames:
      - db
  scope: Namespaced
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                engine:
                  type: string
                  enum: [postgres, mysql]
                version:
                  type: string
                storage:
                  type: string
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 5
            status:
              type: object
              properties:
                ready:
                  type: boolean
                endpoint:
                  type: string

Building with Kubebuilder

Project Setup

# Initialize project
kubebuilder init --domain example.com --repo github.com/example/db-operator

# Create API
kubebuilder create api --group database --version v1 --kind Database

# Generate manifests
make manifests

# Build and run
make run

API Types

// api/v1/database_types.go
package v1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

// DatabaseSpec defines the desired state
type DatabaseSpec struct {
    Engine   string `json:"engine"`
    Version  string `json:"version"`
    Storage  string `json:"storage"`
    Replicas int32  `json:"replicas"`
}

// DatabaseStatus defines the observed state
type DatabaseStatus struct {
    Ready    bool   `json:"ready"`
    Endpoint string `json:"endpoint,omitempty"`
    Phase    string `json:"phase,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.spec.engine`
// +kubebuilder:printcolumn:name="Ready",type=boolean,JSONPath=`.status.ready`

type Database struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   DatabaseSpec   `json:"spec,omitempty"`
    Status DatabaseStatus `json:"status,omitempty"`
}

Controller Implementation

// controllers/database_controller.go
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)

    // Fetch Database resource
    var db databasev1.Database
    if err := r.Get(ctx, req.NamespacedName, &db); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // Check if being deleted
    if !db.DeletionTimestamp.IsZero() {
        return r.handleDeletion(ctx, &db)
    }

    // Add finalizer
    if !controllerutil.ContainsFinalizer(&db, finalizerName) {
        controllerutil.AddFinalizer(&db, finalizerName)
        if err := r.Update(ctx, &db); err != nil {
            return ctrl.Result{}, err
        }
    }

    // Reconcile resources
    if err := r.reconcileStatefulSet(ctx, &db); err != nil {
        return ctrl.Result{}, err
    }
    if err := r.reconcileService(ctx, &db); err != nil {
        return ctrl.Result{}, err
    }
    if err := r.reconcileSecret(ctx, &db); err != nil {
        return ctrl.Result{}, err
    }

    // Update status
    if err := r.updateStatus(ctx, &db); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

func (r *DatabaseReconciler) reconcileStatefulSet(ctx context.Context, db *databasev1.Database) error {
    desired := buildStatefulSet(db)

    // Set owner reference
    if err := controllerutil.SetControllerReference(db, desired, r.Scheme); err != nil {
        return err
    }

    // Create or update
    var existing appsv1.StatefulSet
    err := r.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &existing)
    if errors.IsNotFound(err) {
        return r.Create(ctx, desired)
    }
    if err != nil {
        return err
    }

    // Update if needed
    existing.Spec = desired.Spec
    return r.Update(ctx, &existing)
}

Best Practices

Idempotent Reconciliation

Every reconcile should be safe to run multiple times:

func (r *Reconciler) reconcileResource(ctx context.Context, db *Database) error {
    // Always check if exists first
    var existing corev1.ConfigMap
    err := r.Get(ctx, types.NamespacedName{Name: db.Name, Namespace: db.Namespace}, &existing)

    if errors.IsNotFound(err) {
        // Create if not exists
        desired := buildConfigMap(db)
        return r.Create(ctx, desired)
    }
    if err != nil {
        return err
    }

    // Update if changed
    desired := buildConfigMap(db)
    if !reflect.DeepEqual(existing.Data, desired.Data) {
        existing.Data = desired.Data
        return r.Update(ctx, &existing)
    }

    return nil
}

Finalizers for Cleanup

Ensure external resources are cleaned up:

const finalizerName = "database.example.com/finalizer"

func (r *Reconciler) handleDeletion(ctx context.Context, db *Database) (ctrl.Result, error) {
    if controllerutil.ContainsFinalizer(db, finalizerName) {
        // Clean up external resources
        if err := r.deleteExternalResources(ctx, db); err != nil {
            return ctrl.Result{}, err
        }

        // Remove finalizer
        controllerutil.RemoveFinalizer(db, finalizerName)
        if err := r.Update(ctx, db); err != nil {
            return ctrl.Result{}, err
        }
    }
    return ctrl.Result{}, nil
}

func (r *Reconciler) deleteExternalResources(ctx context.Context, db *Database) error {
    // Clean up cloud resources, backups, etc.
    return nil
}

Status Conditions

Use conditions for detailed status:

type DatabaseStatus struct {
    Conditions []metav1.Condition `json:"conditions,omitempty"`
}

func (r *Reconciler) updateCondition(ctx context.Context, db *Database, condition metav1.Condition) error {
    meta.SetStatusCondition(&db.Status.Conditions, condition)
    return r.Status().Update(ctx, db)
}

// Usage
r.updateCondition(ctx, db, metav1.Condition{
    Type:    "Ready",
    Status:  metav1.ConditionTrue,
    Reason:  "DatabaseReady",
    Message: "Database is accepting connections",
})

Event Recording

Surface important events:

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // ... reconciliation logic ...

    if err := r.reconcileStatefulSet(ctx, &db); err != nil {
        r.Recorder.Event(&db, corev1.EventTypeWarning, "ReconcileFailed", err.Error())
        return ctrl.Result{}, err
    }

    r.Recorder.Event(&db, corev1.EventTypeNormal, "Reconciled", "Database reconciled successfully")
    return ctrl.Result{}, nil
}

Rate Limiting

Prevent overwhelming the API server:

func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&databasev1.Database{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        WithOptions(controller.Options{
            MaxConcurrentReconciles: 3,
            RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(
                time.Second,      // Base delay
                5*time.Minute,    // Max delay
            ),
        }).
        Complete(r)
}

Testing

Unit Testing Controllers

func TestReconcile(t *testing.T) {
    scheme := runtime.NewScheme()
    _ = databasev1.AddToScheme(scheme)
    _ = corev1.AddToScheme(scheme)

    db := &databasev1.Database{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-db",
            Namespace: "default",
        },
        Spec: databasev1.DatabaseSpec{
            Engine:   "postgres",
            Version:  "13",
            Replicas: 1,
        },
    }

    client := fake.NewClientBuilder().
        WithScheme(scheme).
        WithObjects(db).
        Build()

    reconciler := &DatabaseReconciler{
        Client: client,
        Scheme: scheme,
    }

    result, err := reconciler.Reconcile(context.Background(), ctrl.Request{
        NamespacedName: types.NamespacedName{
            Name:      "test-db",
            Namespace: "default",
        },
    })

    require.NoError(t, err)
    assert.False(t, result.Requeue)
}

Integration Testing with envtest

func TestIntegration(t *testing.T) {
    testEnv := &envtest.Environment{
        CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
    }

    cfg, err := testEnv.Start()
    require.NoError(t, err)
    defer testEnv.Stop()

    mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme})
    require.NoError(t, err)

    err = (&DatabaseReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr)
    require.NoError(t, err)

    go mgr.Start(context.Background())

    // Test reconciliation
    // ...
}

Deployment

Operator Lifecycle Manager (OLM)

Package for OLM distribution:

# ClusterServiceVersion
apiVersion: operators.coreos.com/v1alpha1
kind: ClusterServiceVersion
metadata:
  name: db-operator.v1.0.0
spec:
  displayName: Database Operator
  description: Manages PostgreSQL and MySQL databases
  version: 1.0.0
  installModes:
    - type: OwnNamespace
      supported: true
    - type: SingleNamespace
      supported: true
  install:
    strategy: deployment
    spec:
      deployments:
        - name: db-operator
          spec:
            replicas: 1
            # ...

Helm Chart

# templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "db-operator.fullname" . }}
spec:
  replicas: {{ .Values.replicaCount }}
  template:
    spec:
      containers:
        - name: manager
          image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
          args:
            - --leader-elect
            - --metrics-bind-address=:8080
          resources:
            {{- toYaml .Values.resources | nindent 12 }}

Key Takeaways

Operators are powerful when applications need more than just running containers. Build them when manual intervention would otherwise be required.