From 481bad6fea7a1b8bc198439703a26a018135d267 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Wed, 20 Aug 2025 12:39:29 -0400 Subject: [PATCH] Add doc.go and ARCHITECTURE.md to client-go Kubernetes-commit: accdd9e27e74706f63e06ff5cb0476098b377b1e --- ARCHITECTURE.md | 161 ++++++++++++++++++ doc.go | 93 +++++++++++ example_test.go | 429 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 683 insertions(+) create mode 100644 ARCHITECTURE.md create mode 100644 example_test.go diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 000000000..731f37903 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,161 @@ +# `client-go` Architecture + +This document explains the internal architecture of `client-go` for contributors. It describes the +major components, how they interact, and the key design decisions that shape the library. + +## Client Configuration + +There is an architectural separation between loading client configuration and using it. The +`rest.Config` object is the in-memory representation of this configuration. The +`tools/clientcmd` package is the standard factory for producing it. `clientcmd` handles the +complex logic of parsing `kubeconfig` files, merging contexts, and handling external +authentication providers (e.g., OIDC). + +## REST Client + +The `rest.Client` is the foundational HTTP client that underpins all other clients. It separates +the low-level concerns of HTTP transport, serialization, and error handling from higher-level, +Kubernetes-specific object logic. + +The `rest.Config` object is used to build the underlying HTTP transport, which is typically a +chain of `http.RoundTripper` objects. Each element in the chain is responsible for a specific +task, such as adding an `Authorization` header. This is the mechanism by which all authentication +is injected into requests. + +The client uses a builder pattern for requests (e.g., `.Verb()`, `.Resource()`), deferring +response processing until a method like `.Into(&pod)` is called. This separation is key to +supporting different client models from a common base. + +### Endpoint Interactions + +* **Content Negotiation:** The client uses HTTP `Accept` headers to negotiate the wire format + (JSON or Protobuf). A key performance optimization using this mechanism is the ability to + request metadata-only lists via the `PartialObjectMetadataList` media type, which returns + objects containing only `TypeMeta` and `ObjectMeta`. +* **Subresources:** The client can target standard subresources like `/status` or `/scale` for + object mutations, and it can also handle action-oriented subresources like `/logs` or + `/exec`, which often involve streaming data. +* **List Pagination:** For `LIST` requests, the client can specify a `limit`. The server will + return up to that many items and, if more exist, a `continue` token. The client is + responsible for passing this token in a subsequent request to retrieve the next page. + Higher-level tools like the `Reflector`'s `ListerWatcher` handle this logic automatically. +* **Streaming Watches:** A `WATCH` request returns a `watch.Interface` (from + `k8s.io/apimachinery/pkg/watch`), which provides a channel of structured `watch.Event` + objects (`ADDED`, `MODIFIED`, `DELETED`, `BOOKMARK`). This decouples the watch consumer from + the underlying streaming protocol. + +### Errors, Warnings, and Rate Limiting + +* **Structured Errors:** The client deserializes non-2xx responses into a structured + `errors.StatusError`, enabling programmatic error handling (e.g., `errors.IsNotFound(err)`). +* **Warnings:** It processes non-fatal `Warning` headers from the API server via a + `WarningHandler`. +* **Client-Side Rate Limiting:** The `QPS` and `Burst` settings in `rest.Config` are the + client's half of the contract with the server's API Priority and Fairness system. +* **Server-Side Throttling:** The client's default transport automatically handles HTTP `429` + responses by reading the `Retry-After` header, waiting, and retrying the request. + +## Typed and Dynamic Clients + +To handle the extensible nature of the Kubernetes API, `client-go` provides two primary client +models. + +The **`kubernetes.Clientset`** provides compile-time, type-safe access to core, built-in APIs. + +The **`dynamic.DynamicClient`** represents all objects as `unstructured.Unstructured`, allowing it +to interact with any API resource, including CRDs. It relies on two discovery mechanisms: +1. The **`discovery.DiscoveryClient`** determines *what* resources exist. The + **`CachedDiscoveryClient`** is a critical optimization that caches this data on disk to solve + the severe N+1 request performance bottleneck that can occur during discovery. +2. The **OpenAPI schema** (fetched from `/openapi/v3`) describes the *structure* of those + resources, providing the schema awareness needed by the dynamic client. + +## Code Generation + +A core architectural principle of `client-go` is the use of code generation to provide a +strongly-typed, compile-time-safe interface for specific API GroupVersions. This makes +controller code more robust and easier to maintain. The tools in `k8s.io/code-generator` produce +several key components: + +* **Typed Clientsets:** The primary interface for interacting with a specific GroupVersion. +* **Typed Listers:** The read-only, cached accessors used by controllers. +* **Typed Informers:** The machinery for populating the cache for a specific type. +* **Apply Configurations:** The type-safe builders for Server-Side Apply. + +A contributor modifying a built-in API type **must** run the code generation scripts to update all +of these dependent components. + +## Controller Infrastructure + +The `tools/cache` package provides the core infrastructure for controllers, replacing a high-load, +request-based pattern with a low-load, event-driven, cached model. + +The data flow is as follows: + +```mermaid +graph TD + subgraph "Kubernetes API" + API_Server[API Server] + end + + subgraph "client-go: Informer Mechanism" + Reflector("1. Reflector") + DeltaFIFO("2. DeltaFIFO") + Indexer["3. Indexer (Cache)"] + EventHandlers("4. Event Handlers") + end + + subgraph "User Code" + WorkQueue["5. Work Queue"] + Controller("6. Controller") + end + + API_Server -- LIST/WATCH --> Reflector + Reflector -- Puts changes into --> DeltaFIFO + DeltaFIFO -- Is popped by internal loop, which updates --> Indexer + Indexer -- Update triggers --> EventHandlers + EventHandlers -- Adds key to --> WorkQueue + WorkQueue -- Is processed by --> Controller + Controller -- Reads from cache via Lister --> Indexer +``` + +A **`Reflector`** performs a `LIST` to get a consistent snapshot of a resource, identified by a +`resourceVersion`. It then starts a `WATCH` from that `resourceVersion` to receive a continuous +stream of subsequent changes. The `Reflector`'s relist/rewatch loop is designed to solve the +**"too old" `resourceVersion` error** by re-listing. To make this recovery more efficient, the +`Reflector` consumes **watch bookmarks** from the server, which provide a more recent +`resourceVersion` to restart from. + +The **`Lister`** is the primary, read-only, thread-safe interface for a controller's business +logic to access the `Indexer`'s cache. + +## Controller Patterns + +The controller infrastructure is architecturally decoupled from the controller's business logic to +ensure resiliency. + +The **`util/workqueue`** creates a critical boundary between event detection (the informer's job) +and reconciliation (the controller's job). Informer event handlers only add an object's key to the +work queue. This allows the controller to retry failed operations with exponential backoff without +blocking the informer's watch stream. + +For high availability, the **`tools/leaderelection`** package provides the standard architectural +solution to ensure single-writer semantics by having replicas compete to acquire a lock on a +shared `Lease` object. + +## Server-Side Apply + +`client-go` provides a distinct architectural pattern for object mutation that aligns with the +server's declarative model. This is a separate workflow from the traditional `get-modify-update` +model that allows multiple controllers to safely co-manage the same object. The +**`applyconfigurations`** package provides the generated, type-safe builder API used to +construct the declarative patch. + +## Versioning and Compatibility + +`client-go` has a strict versioning relationship with the main Kubernetes repository. A `client-go` +version `v0.X.Y` corresponds to the Kubernetes version `v1.X.Y`. + +The Kubernetes API has strong backward compatibility guarantees: a client built with an older +version of `client-go` will work with a newer API server. However, the reverse is not guaranteed. +A contributor must not break compatibility with supported versions of the Kubernetes API server. diff --git a/doc.go b/doc.go index d0f766a7e..27e92a265 100644 --- a/doc.go +++ b/doc.go @@ -14,4 +14,97 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package clientgo is the official Go client for the Kubernetes API. It provides +// a standard set of clients and tools for building applications, controllers, +// and operators that communicate with a Kubernetes cluster. +// +// # Key Packages +// +// - kubernetes: Contains the typed Clientset for interacting with built-in, +// versioned API objects (e.g., Pods, Deployments). This is the most common +// starting point. +// +// - dynamic: Provides a dynamic client that can perform operations on any +// Kubernetes object, including Custom Resources (CRs). It is essential for +// building controllers that work with CRDs. +// +// - discovery: Used to discover the API groups, versions, and resources +// supported by a Kubernetes cluster. +// +// - tools/cache: The foundation of the controller pattern. This package provides +// efficient caching and synchronization mechanisms (Informers and Listers) +// for building controllers. +// +// - tools/clientcmd: Provides methods for loading client configuration from +// kubeconfig files. This is essential for out-of-cluster applications and CLI tools. +// +// - rest: Provides a lower-level RESTClient that manages the details of +// communicating with the Kubernetes API server. It is useful for advanced +// use cases that require fine-grained control over requests, such as working +// with non-standard REST verbs. +// +// # Connecting to the API +// +// There are two primary ways to configure a client to connect to the API server: +// +// 1. In-Cluster Configuration: For applications running inside a Kubernetes pod, +// the `rest.InClusterConfig()` function provides a straightforward way to +// configure the client. It automatically uses the pod's service account for +// authentication and is the recommended approach for controllers and operators. +// +// 2. Out-of-Cluster Configuration: For local development or command-line tools, +// the `clientcmd` package is used to load configuration from a +// kubeconfig file. +// +// The `rest.Config` object allows for fine-grained control over client-side +// performance and reliability. Key settings include: +// +// - QPS: The maximum number of queries per second to the API server. +// - Burst: The maximum number of queries that can be issued in a single burst. +// - Timeout: The timeout for individual requests. +// +// # Interacting with API Objects +// +// Once configured, a client can be used to interact with objects in the cluster. +// +// - The Typed Clientset (`kubernetes` package) provides a strongly typed +// interface for working with built-in Kubernetes objects. +// +// - The Dynamic Client (`dynamic` package) can work with any object, including +// Custom Resources, using `unstructured.Unstructured` types. +// +// - For Custom Resources (CRDs), the `k8s.io/code-generator` repository +// contains the tools to generate typed clients, informers, and listers. The +// `sample-controller` is the canonical example of this pattern. +// +// - Server-Side Apply is a patching strategy that allows multiple actors to +// share management of an object by tracking field ownership. This prevents +// actors from inadvertently overwriting each other's changes and provides +// a mechanism for resolving conflicts. The `applyconfigurations` package +// provides the necessary tools for this declarative approach. +// +// # Handling API Errors +// +// Robust error handling is essential when interacting with the API. The +// `k8s.io/apimachinery/pkg/api/errors` package provides functions to inspect +// errors and check for common conditions, such as whether a resource was not +// found or already exists. This allows controllers to implement robust, +// idempotent reconciliation logic. +// +// # Building Controllers +// +// The controller pattern is central to Kubernetes. A controller observes the +// state of the cluster and works to bring it to the desired state. +// +// - The `tools/cache` package provides the building blocks for this pattern. +// Informers watch the API server and maintain a local cache, Listers provide +// read-only access to the cache, and Workqueues decouple event detection +// from processing. +// +// - In a high-availability deployment where multiple instances of a controller +// are running, leader election (`tools/leaderelection`) is used to ensure +// that only one instance is active at a time. +// +// - Client-side feature gates allow for enabling or disabling experimental +// features in `client-go`. They can be configured via the `rest.Config` object. package clientgo diff --git a/example_test.go b/example_test.go new file mode 100644 index 000000000..8845cbe5c --- /dev/null +++ b/example_test.go @@ -0,0 +1,429 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clientgo_test + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" + corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +func Example_inClusterConfiguration() { + // This example demonstrates how to create a clientset for an application + // running inside a Kubernetes cluster. It uses the pod's service account + // for authentication. + + // rest.InClusterConfig() returns a configuration object that can be used to + // create a clientset. It is the recommended way to configure a client for + // in-cluster applications. + config, err := rest.InClusterConfig() + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Create the clientset. + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Use the clientset to interact with the API. + +pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + fmt.Printf("There are %d pods in the default namespace\n", len(pods.Items)) +} + +func Example_outOfClusterConfiguration() { + // This example demonstrates how to create a clientset for an application + // running outside of a Kubernetes cluster, using a kubeconfig file. This is + // the standard approach for local development and command-line tools. + + // The default location for the kubeconfig file is in the user's home directory. + var kubeconfig string + if home := os.Getenv("HOME"); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + + // Create the client configuration from the kubeconfig file. + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Configure client-side rate limiting. + config.QPS = 50 + config.Burst = 100 + + // A clientset contains clients for all the API groups and versions supported + // by the cluster. + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Use the clientset to interact with the API. + +pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + fmt.Printf("There are %d pods in the default namespace\n", len(pods.Items)) +} + +func Example_handlingAPIErrors() { + // This example demonstrates how to handle common API errors. + // Create a NotFound error to simulate a failed API request. + err := errors.NewNotFound(schema.GroupResource{Group: "v1", Resource: "pods"}, "my-pod") + + // The errors.IsNotFound() function checks if an error is a NotFound error. + if errors.IsNotFound(err) { + fmt.Println("Pod not found") + } + + // The errors package provides functions for other common API errors, such as: + // - errors.IsAlreadyExists(err) + // - errors.IsConflict(err) + // - errors.IsServerTimeout(err) + + // Output: + // Pod not found +} + +func Example_usingDynamicClient() { + // This example demonstrates how to create and use a dynamic client to work + // with Custom Resources or other objects without needing their Go type + // definitions. + + // Configure the client (out-of-cluster for this example). + var kubeconfig string + if home := os.Getenv("HOME"); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Create a dynamic client. + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Define the GroupVersionResource for the object you want to access. + // For Pods, this is {Group: "", Version: "v1", Resource: "pods"}. + gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + + // Use the dynamic client to list all pods in the "default" namespace. + // The result is an UnstructuredList. + list, err := dynamicClient.Resource(gvr).Namespace("default").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Iterate over the list and print the name of each pod. + for _, item := range list.Items { + name, found, err := unstructured.NestedString(item.Object, "metadata", "name") + if err != nil || !found { + fmt.Printf("Could not find name for pod: %v\n", err) + continue + } + fmt.Printf("Pod Name: %s\n", name) + } +} + +func Example_usingInformers() { + // This example demonstrates the basic pattern for using an informer to watch + // for changes to Pods. This is a conceptual example; a real controller would + // have more robust logic and a workqueue. + + // Configure the client (out-of-cluster for this example). + var kubeconfig string + if home := os.Getenv("HOME"); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // A SharedInformerFactory provides a shared cache for multiple informers, + // which reduces memory and network overhead. + factory := informers.NewSharedInformerFactory(clientset, 10*time.Minute) + podInformer := factory.Core().V1().Pods().Informer() + + _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + log.Printf("Pod ADDED: %s", key) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err == nil { + log.Printf("Pod UPDATED: %s", key) + } + }, + DeleteFunc: func(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + log.Printf("Pod DELETED: %s", key) + } + }, + }) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Graceful shutdown requires a two-channel pattern. + // + // The first channel, `sigCh`, is used by the `signal` package to send us + // OS signals (e.g., Ctrl+C). This channel must be of type `chan os.Signal`. + // + // The second channel, `stopCh`, is used to tell the informer factory to + // stop. The informer factory's `Start` method expects a channel of type + // `<-chan struct{}`. It will stop when this channel is closed. + // + // The goroutine below is the "translator" that connects these two channels. + // It waits for a signal on `sigCh`, and when it receives one, it closes + // `stopCh`, which in turn tells the informer factory to shut down. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + stopCh := make(chan struct{}) + go func() { + <-sigCh + close(stopCh) + }() + + // Start the informer. + factory.Start(stopCh) + + // Wait for the initial cache sync. + if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) { + log.Println("Timed out waiting for caches to sync") + return + } + + log.Println("Informer has synced. Watching for Pod events...") + + // Wait for the stop signal. + <-stopCh + log.Println("Shutting down...") +} + +func Example_serverSideApply() { + // This example demonstrates how to use Server-Side Apply to declaratively + // manage a Deployment object. Server-Side Apply is the recommended approach + // for controllers and operators to manage objects. + + // Configure the client (out-of-cluster for this example). + var kubeconfig string + if home := os.Getenv("HOME"); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + // Define the desired state of the Deployment using the applyconfigurations package. + // This provides a typed, structured way to build the patch. + deploymentName := "my-app" + replicas := int32(2) + image := "nginx:1.14.2" + + // The FieldManager is a required field that identifies the controller managing + // this object's state. + fieldManager := "my-controller" + + // Build the apply configuration. + deploymentApplyConfig := appsv1ac.Deployment(deploymentName, "default"). + WithSpec(appsv1ac.DeploymentSpec(). + WithReplicas(replicas). + WithSelector(metav1ac.LabelSelector().WithMatchLabels(map[string]string{"app": "my-app"})). + WithTemplate(corev1ac.PodTemplateSpec(). + WithLabels(map[string]string{"app": "my-app"}). + WithSpec(corev1ac.PodSpec(). + WithContainers(corev1ac.Container(). + WithName("nginx"). + WithImage(image), + ), + ), + ), + ) + + // Perform the Server-Side Apply patch. The PatchType must be types.ApplyPatchType. + // The context, name, apply configuration, and patch options are required. + result, err := clientset.AppsV1().Deployments("default").Apply( + context.TODO(), + deploymentApplyConfig, + metav1.ApplyOptions{FieldManager: fieldManager}, + ) + + if err != nil { + fmt.Printf("Error encountered: %v\n", err) + return + } + + fmt.Printf("Deployment %q applied successfully.\n", result.Name) +} + +func Example_leaderElection() { + // This example demonstrates the leader election pattern. A controller running + // multiple replicas uses leader election to ensure that only one replica is + // active at a time. + + // Configure the client (out-of-cluster for this example). + var kubeconfig string + if home := os.Getenv("HOME"); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + fmt.Printf("Error building kubeconfig: %s\n", err.Error()) + return + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Printf("Error building clientset: %s\n", err.Error()) + return + } + + // The unique name of the controller writing to the Lease object. + id := "my-controller" + + // The namespace and name of the Lease object. + leaseNamespace := "default" + leaseName := "my-controller-lease" + + // Create a context that can be cancelled to stop the leader election. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up a signal handler to cancel the context on termination. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + log.Println("Received termination signal, shutting down...") + cancel() + }() + + // Create the lock object. + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: leaseNamespace, + }, + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + } + + // Create the leader elector. + elector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // This function is called when the controller becomes the leader. + // You would start your controller's main logic here. + log.Println("Became leader, starting controller.") + // This is a simple placeholder for the controller's work. + for { + select { + case <-time.After(5 * time.Second): + log.Println("Doing controller work...") + case <-ctx.Done(): + log.Println("Controller stopped.") + return + } + } + }, + OnStoppedLeading: func() { + // This function is called when the controller loses leadership. + // You should stop any active work and gracefully shut down. + log.Printf("Lost leadership, shutting down.") + }, + OnNewLeader: func(identity string) { + // This function is called when a new leader is elected. + if identity != id { + log.Printf("New leader elected: %s", identity) + } + }, + }, + }) + if err != nil { + fmt.Printf("Error creating leader elector: %v\n", err) + return + } + + // Start the leader election loop. + elector.Run(ctx) +}