Monitoring Kubernetes with Informers and Home Assistant



I’m running HomeAssistant along with a number of other services using microk8s and half-a-dozen Raspberry Pi4’s. The Kubernetes API provides a means for us to see the status of nodes, deployments, statefulsets and pods. I started off polling the API for updates but recently updated the code to use Informers. I couldn’t find much documentation out there so hopefully this post is useful for someone…

image

The code below is still a prototype and could do with some tidying up but seems to work :-)

Accessing the Kubernetes API from nodejs

Kubernetes provides a client library for javascript:

const k8s = require('@kubernetes/client-node');

Buried in the examples is code to get started with an informer. It’s typescript so required a couple of minor tweaks for javascript but forms the basis of my script

import * as k8s from '@kubernetes/client-node';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const k8sApi = kc.makeApiClient(k8s.CoreV1Api);

const listFn = () => k8sApi.listNamespacedPod('default');

const informer = k8s.makeInformer(kc, '/api/v1/namespaces/default/pods', listFn);

informer.on('add', (obj: k8s.V1Pod) => {
    console.log(`Added: ${obj.metadata!.name}`);
});
informer.on('update', (obj: k8s.V1Pod) => {
    console.log(`Updated: ${obj.metadata!.name}`);
});
informer.on('delete', (obj: k8s.V1Pod) => {
    console.log(`Deleted: ${obj.metadata!.name}`);
});
informer.on('error', (err: k8s.V1Pod) => {
    console.error(err);
    // Restart informer after 5sec
    setTimeout(() => {
        informer.start();
    }, 5000);
});

informer.start();

Watching Nodes

The example above watches pods for updates. To watch nodes we make a few changes:

/*
NODE
https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/stateful-set-v1/#list-list-or-watch-objects-of-kind-statefulset-1
----------
*/
const nodeListFn = () => k8sApi.listNode();
let nodeInformer = null;

function createNodeInformer()
{
	nodeInformer = k8s.makeInformer(kc, '/api/v1/nodes', nodeListFn);
	nodeInformer.on('add', (obj /* : k8s.V1Pod */) => {
			console.log(`Added: ${obj.metadata.name}`);
			// Node conditions
			// console.log(obj.status.conditions);
			getNodeStatus(obj);
			showDashboardCodeInConsole();
		});
	nodeInformer.on('update', (obj) => {
			console.log(`Updated: ${obj.metadata.name}`);
			//console.log(obj)
			getNodeStatus(obj);
	});
	nodeInformer.on('delete', (obj) => {
			console.log(`Deleted: ${obj.metadata.name}`);
	});
	nodeInformer.on('error', (err) => {
			console.error(err);
			// Restart informer after 5sec
			setTimeout(() => {
				nodeInformer.start();
			}, 5000);
	});
	nodeInformer.start();
}

async function destroyNodeInformer()
{
	await nodeInformer.stop();
	nodeInformer = null
}

createNodeInformer();

Watching deployments

Similarly for deployments:

/*
DEPLOYMENT
https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/deployment-v1/#list-list-or-watch-objects-of-kind-deployment-1
----------
*/
const deploymentListFn = () => k8sAppApi.listDeploymentForAllNamespaces();
let deploymentInformer = null;

function getDeploymentStatus(d)
{
	let statusUpdate = [];
	let s = {};
	let clusterActionNeededMsg = "";

	s.attributes = {};
	s.entity = "sensor." + "deployment_" + d.metadata.name.replaceAll('-', '_');
	s.state = 'off';
	s.attributes.friendly_name = "Deployment:" + d.metadata.name;
	s.attributes.short_name = d.metadata.name;
	s.attributes.readyReplicas = d.status.readyReplicas
	if (typeof(s.attributes.readyReplicas) == 'undefined') {
		s.attributes.readyReplicas=0;
	}
	s.attributes.specReplicas = d.spec.replicas
	if (s.attributes.readyReplicas == s.attributes.specReplicas) {
		s.state = 'on';
	} else {
		clusterActionNeededMsg = "Deployment not ready: " + d.metadata.name + "\n"
	}
	s.attributes.last_updated = new Date();
	statusUpdate.push(s);
	hassUpdate.postHassUpdate(statusUpdate);
	let t = {}
	t.attributes = s.attributes
	t.state = s.state;
	t.msg = clusterActionNeededMsg;
	k8sMasterList[s.entity] = t;
	checkK8sMasterList();
}

function createDeploymentInformer()
{
	deploymentInformer = k8s.makeInformer(kc, '/apis/apps/v1/deployments', deploymentListFn);
	deploymentInformer.on('add', (obj /* : k8s.V1Pod */) => {
		console.log(`Added: ${obj.metadata.name}. Status: ${obj.status.readyReplicas} / ${obj.spec.replicas}`);
		// Node conditions
		// console.log(obj.status.conditions);
		getDeploymentStatus(obj);
		showDashboardCodeInConsole();
	});
	deploymentInformer.on('update', (obj) => {
		console.log(`Updated: ${obj.metadata.name}, Status: ${obj.status.readyReplicas} / ${obj.spec.replicas}`);
		//console.log(obj)
		getDeploymentStatus(obj);
	});
	deploymentInformer.on('delete', (obj) => {
		console.log(`Deleted: ${obj.metadata.name}, Status: ${obj.status.readyReplicas} / ${obj.spec.replicas}`);
	});
	deploymentInformer.on('error', (err) => {
		console.error(err);
		// Restart informer after 5sec
		setTimeout(() => {
			deploymentInformer.start();
		}, 5000);
	});
	deploymentInformer.start();
}

async function destroyDeploymentInformer()
{
	await deploymentInformer.stop();
	deploymentInformer = null
}

createDeploymentInformer();

Watching StatefulSets

And again for StatefulSets:

/*
STATEFULSET
https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/stateful-set-v1/#list-list-or-watch-objects-of-kind-statefulset-1
----------
*/
const statusfulSetInformerListFn = () => k8sAppApi.listStatefulSetForAllNamespaces();
let statusfulSetInformer = null;

function getStatusfulSetStatus(ss)
{
	let statusUpdate = [];
	let s = {};
	let clusterActionNeededMsg = "";

	s.attributes = {};
	s.entity = "sensor." + "statefulset_" + ss.metadata.name.replaceAll('-', '_');
	s.state = 'off';
	s.attributes.friendly_name = "StatefulSet:" + ss.metadata.name;
	s.attributes.short_name = ss.metadata.name;
	s.attributes.readyReplicas = ss.status.readyReplicas
	if (typeof(s.attributes.readyReplicas) == 'undefined') {
		s.attributes.readyReplicas=0;
	}
	s.attributes.specReplicas = ss.spec.replicas
	if (s.attributes.readyReplicas == s.attributes.specReplicas) {
		s.state = 'on';
	} else {
		clusterActionNeededMsg = "StatefulSet not ready: " + ss.metadata.name + "\n"
	}
	s.attributes.last_updated = new Date();
	statusUpdate.push(s);
	hassUpdate.postHassUpdate(statusUpdate);
	let t = {}
	t.attributes = s.attributes
	t.state = s.state;
	t.msg = clusterActionNeededMsg;
	k8sMasterList[s.entity] = t;
	checkK8sMasterList();
}

function createStatusfulSetInformer()
{
	statusfulSetInformer = k8s.makeInformer(kc, '/apis/apps/v1/statefulsets', statusfulSetInformerListFn);
	statusfulSetInformer.on('add', (obj /* : k8s.V1Pod */) => {
			console.log(`Added: ${obj.metadata.name}. Status: ${obj.status.readyReplicas} / ${obj.spec.replicas}`);
			// Node conditions
			// console.log(obj.status.conditions);
			getStatusfulSetStatus(obj);
			showDashboardCodeInConsole();
		});
	statusfulSetInformer.on('update', (obj) => {
			console.log(`Updated: ${obj.metadata.name}, Status: ${obj.status.readyReplicas} / ${obj.spec.replicas}`);
			//console.log(obj)
			getStatusfulSetStatus(obj);
	});
	statusfulSetInformer.on('delete', (obj) => {
			console.log(`Deleted: ${obj.metadata.name}, Status: ${obj.status.readyReplicas} / ${obj.spec.replicas}`);
	});
	statusfulSetInformer.on('error', (err) => {
			console.error(err);
			// Restart informer after 5sec
			setTimeout(() => {
				statusfulSetInformer.start();
			}, 5000);
	});
	statusfulSetInformer.start();
}

async function destroyStatusfulSetInformer()
{
	await statusfulSetInformer.stop();
	statusfulSetInformer = null
}

createStatusfulSetInformer();

Updating Home Assistant Sensors

The Home Assistant REST API makes it really easy to create a sensor for each or our nodes, deployments, statefulsets (and pods if you want). All we need is a sensor name, state and optional attributes:

const axios = require('axios');
const rateLimit = require('axios-rate-limit');

const HASS_URL = "<< HOME ASSISTANT URL >>";
const HASS_TOKEN = "<< LONG LIVED TOKEN >>";
function postHassUpdate(statusUpdate)
{
	const axiosRateLimited = rateLimit(axios.create(), { maxRequests: 5, perMilliseconds: 100, maxRPS: 2 })
	for(const s of statusUpdate)
	{
		let url = HASS_URL + s.entity;
		axiosRateLimited.post(url, {
			'state': s.state,
			'attributes': s.attributes
		}, {
			headers: {
				'Content-Type' : 'application/json',
				'Authorization': "Bearer " + HASS_TOKEN
			}
		})
		.then(function (response) {
			logger.info('Updated: ' + url);
			logger.info('Response: ' + response.status + ' ' + response.statusText)
		})
		.catch(function (error) {
			logger.error('Error updating: ' + url);
			logger.error(error);
		});
	}
}

Overall cluster status

So far, we’ve created sensors for individual nodes, deployments and statefulsets. I wanted an alert if anything changes on the cluster but didn’t want to set up conditions for every sensor.

Instead, I created an overall sensor called ‘microk8sClusterActionNeeded’ and made this a superset of the individual sensors. If any of the other sensors changes to ‘off’, this sensor will change to ‘off’. Only if everything is healthy will this sensor be ‘on’.

function checkK8sMasterList()
{
	let clusterActionNeeded = false;
	let clusterActionNeededMsg = '';

	// console.log(k8sMasterList);
	for (i in k8sMasterList)
	{
		let t = k8sMasterList[i]
		if (t.state != 'on')
		{
			clusterActionNeeded = true;
			clusterActionNeededMsg += t.msg;
		}
	}
	// Finally add the clusterActionNeeded state
	let s = {};
	let statusUpdate = [];

	s.attributes = {};
	s.entity = "sensor." + "microk8sClusterActionNeeded";
	s.state = 'off';
	s.attributes.clusterActionNeededMsg = ''
	if (clusterActionNeeded == true) {
		s.state = 'on';
		s.attributes.clusterActionNeededMsg = clusterActionNeededMsg
		console.log('ClusterActionNeeded')
		console.log(clusterActionNeededMsg)
	}
	s.attributes.last_updated = new Date();
	statusUpdate.push(s);
	hassUpdate.postHassUpdate(statusUpdate);
}

Restarting Informers from Home Assistant

The code above starts the informers when our watcher script begins. But what if homeassistant is restarted - the states will become stale. I couldn’t find any documentation on restarting informers.. There are a few open issues on github, but this code seems to work:

async function restartInformers()
{
	await destroyDeploymentInformer();
	createDeploymentInformer();

	await destroyStatusfulSetInformer();
	createStatusfulSetInformer();

	await destroyNodeInformer();
	createNodeInformer();
}

To trigger the informer restart we can set up a simple web url:

const express = require('express');

server.get("/restart", (request, response) => {
	restartInformers();
	response.send("restarted informers");
});

server.listen(3000, () => {
	console.log("Listen on the port 3000...");
});

Now we can restart the informers when Home Asistant restarts using the rest_command plugin:

Add this to configuration.yml

rest_command:
  restart_hass_script_monitor:
    url: "https://informer-address/restart"

Then set up an automation script:

alias: StartUp-RestartHassScriptMonitor
description: ""
trigger:
  - platform: homeassistant
    event: start
condition: []
action:
  - service: rest_command.restart_hass_script_monitor
    data: {}
mode: single

Now the informers will restart and provide a full status update just after Home Assistant Starts!

Comments