Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ kubectl apply -f https://raw.githubusercontent.com/abahmed/kwatch/v0.10.4/deploy
| `app.logFormatter` | used for setting custom formatter when app prints logs: text, json (default: text) |


### 📊 Telemetry (Not Released)

| Parameter | Description |
|:------------------------------|:------------------------------------------- |
| `telemetry.enabled` | If set to true, anonymous telemetry data (cluster ID and version) is sent on first run to help track kwatch usage (default: false) |


### 💓 Health Check (Not Released)

| Parameter | Description |
Expand Down
14 changes: 4 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ type Config struct {
// NodeMonitor configuration
NodeMonitor NodeMonitor `yaml:"nodeMonitor"`

// Telemetry configuration
Telemetry Telemetry `yaml:"telemetry"`

// HealthCheck configuration
HealthCheck HealthCheck `yaml:"healthCheck"`

Expand Down Expand Up @@ -79,6 +76,10 @@ type Config struct {
IgnoreNodeReasons []string `yaml:"ignoreNodeReasons"`
// IgnoreNodeMessages is an optional list of node messages for which alerting should be skipped
IgnoreNodeMessages []string `yaml:"ignoreNodeMessages"`

// ResyncSeconds is the interval (in seconds) for periodic informer resyncs.
// If 0, no periodic resync occurs (event-driven only).
ResyncSeconds int `yaml:"resyncSeconds"`
}

// App confing struct
Expand Down Expand Up @@ -129,13 +130,6 @@ type NodeMonitor struct {
Enabled bool `yaml:"enabled"`
}

// Telemetry config struct
type Telemetry struct {
// Enabled if set to true, it will send anonymous telemetry events
// By default, this value is false
Enabled bool `yaml:"enabled"`
}

// HealthCheck config struct
type HealthCheck struct {
// Enabled if set to true, it will enable health check endpoint
Expand Down
57 changes: 0 additions & 57 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,60 +179,3 @@ func TestIgnoreNodeReasonsSpecialChars(t *testing.T) {
assert.NotNil(cfg)
assert.Equal([]string{"reason-1", "reason_2", "reason.with.dot", "reason/with/slash"}, cfg.IgnoreNodeReasons)
}

func TestTelemetryEnabled(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("CONFIG_FILE")
defer os.RemoveAll("config.yaml")

os.Setenv("CONFIG_FILE", "config.yaml")

n := Config{
Telemetry: Telemetry{
Enabled: true,
},
}
yamlData, _ := yaml.Marshal(&n)
os.WriteFile("config.yaml", yamlData, 0644)

cfg, _ := LoadConfig()
assert.NotNil(cfg)
assert.True(cfg.Telemetry.Enabled)
}

func TestTelemetryDisabled(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("CONFIG_FILE")
defer os.RemoveAll("config.yaml")

os.Setenv("CONFIG_FILE", "config.yaml")

n := Config{
Telemetry: Telemetry{
Enabled: false,
},
}
yamlData, _ := yaml.Marshal(&n)
os.WriteFile("config.yaml", yamlData, 0644)

cfg, _ := LoadConfig()
assert.NotNil(cfg)
assert.False(cfg.Telemetry.Enabled)
}

func TestTelemetryDefault(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("CONFIG_FILE")
defer os.RemoveAll("config.yaml")

os.Setenv("CONFIG_FILE", "config.yaml")

os.WriteFile("config.yaml", []byte{}, 0644)

cfg, _ := LoadConfig()
assert.NotNil(cfg)
assert.False(cfg.Telemetry.Enabled)
}
3 changes: 0 additions & 3 deletions config/defaultConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ func DefaultConfig() *Config {
Upgrader: Upgrader{
DisableUpdateCheck: false,
},
Telemetry: Telemetry{
Enabled: false,
},
HealthCheck: HealthCheck{
Enabled: false,
Port: 8060,
Expand Down
2 changes: 0 additions & 2 deletions constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ const (
DefaultLogs = "No logs captured"
DefaultEvents = "No events captured"
)

const TelemetryAPIURL = "https://telemetry.kwatch.dev/api/v1/events"
223 changes: 223 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package controller

import (
"context"
"fmt"
"time"

"github.com/abahmed/kwatch/config"
"github.com/abahmed/kwatch/handler"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

type Controller struct {
handler handler.Handler
podQueue workqueue.TypedRateLimitingInterface[string]
nodeQueue workqueue.TypedRateLimitingInterface[string]
podLister corev1lister.PodLister
podsSynced cache.InformerSynced
nodeLister corev1lister.NodeLister
nodesSynced cache.InformerSynced
}

func New(
client kubernetes.Interface,
cfg *config.Config,
h handler.Handler,
) (*Controller, func()) {
var opts []informers.SharedInformerOption
if len(cfg.AllowedNamespaces) == 1 {
opts = append(opts, informers.WithNamespace(cfg.AllowedNamespaces[0]))
}

resync := time.Duration(cfg.ResyncSeconds) * time.Second

factory := informers.NewSharedInformerFactoryWithOptions(
client,
resync,
opts...,
)

podInformer := factory.Core().V1().Pods().Informer()
podLister := factory.Core().V1().Pods().Lister()

c := &Controller{
handler: h,
podQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "pods"},
),
nodeQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "nodes"},
),
podLister: podLister,
podsSynced: podInformer.HasSynced,
}

h.SetPodLister(podLister)

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueuePod,
UpdateFunc: func(old, new interface{}) { c.enqueuePod(new) },
DeleteFunc: c.enqueuePod,
})

if cfg.NodeMonitor.Enabled {
nodeInformer := factory.Core().V1().Nodes().Informer()
nodeLister := factory.Core().V1().Nodes().Lister()

c.nodeLister = nodeLister
c.nodesSynced = nodeInformer.HasSynced

h.SetNodeLister(nodeLister)

nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueNode,
UpdateFunc: func(old, new interface{}) { c.enqueueNode(new) },
DeleteFunc: c.enqueueNode,
})
}

stopCh := make(chan struct{})
factory.Start(stopCh)

cleanup := func() {
close(stopCh)
factory.Shutdown()
}

return c, cleanup
}

func (c *Controller) enqueuePod(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.podQueue.Add(key)
}

func (c *Controller) enqueueNode(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.nodeQueue.Add(key)
}

func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.podQueue.ShutDown()
defer c.nodeQueue.ShutDown()

logrus.Info("starting controller")

logrus.Info("waiting for informer caches to sync")
syncFns := []cache.InformerSynced{c.podsSynced}
if c.nodesSynced != nil {
syncFns = append(syncFns, c.nodesSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), syncFns...) {
return fmt.Errorf("failed to wait for caches to sync")
}

logrus.Info("starting workers")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runPodWorker, time.Second)
if c.nodesSynced != nil {
go wait.UntilWithContext(ctx, c.runNodeWorker, time.Second)
}
}

<-ctx.Done()
logrus.Info("shutting down workers")
return nil
}

func (c *Controller) runPodWorker(ctx context.Context) {
for c.processNextPodItem() {
}
}

func (c *Controller) runNodeWorker(ctx context.Context) {
for c.processNextNodeItem() {
}
}

func (c *Controller) processNextPodItem() bool {
key, quit := c.podQueue.Get()
if quit {
return false
}
defer c.podQueue.Done(key)

if err := c.syncPod(key); err != nil {
c.podQueue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("error syncing pod %q: %s, requeuing", key, err.Error()))
return true
}

c.podQueue.Forget(key)
return true
}

func (c *Controller) processNextNodeItem() bool {
key, quit := c.nodeQueue.Get()
if quit {
return false
}
defer c.nodeQueue.Done(key)

if err := c.syncNode(key); err != nil {
c.nodeQueue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("error syncing node %q: %s, requeuing", key, err.Error()))
return true
}

c.nodeQueue.Forget(key)
return true
}

func (c *Controller) syncPod(key string) error {
deleted := false
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

_, err = c.podLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
deleted = true
} else {
return err
}
}

return c.handler.ProcessPod(key, deleted)
}

func (c *Controller) syncNode(key string) error {
deleted := false
_, err := c.nodeLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
deleted = true
} else {
return err
}
}

return c.handler.ProcessNode(key, deleted)
}
Loading
Loading