Kubernetes Async Watches

Recently, I have been busy developing kubeless a serverless framework on top of Kubernetes. It is an exciting twist in my recent adventures as I am now getting back to writing applications and prototyping interesting data processing pipelines. I will blog more about Kubeless soon, for now you can watch my Kubecon talk.

Some feedback that I got was about the ability to use Kubernetes events as triggers of functions. It makes total sense if you are trying to write Kubernetes operators aka controllers that do things when something happens within your cluster.

Some will argue that Kubernetes does not have proper event management, but each API resource has a Watch endpoint. Meaning, we can get a stream of changes to every API object. For example, you can watch changes to a Pod with:

GET /api/v1/watch/namespaces/{namespace}/pods/{name}

Doing a Watch with Python

With the incubating Kubernetes Python client coding a watch is quite quick:

from kubernetes import client, config, watch

config.load_kube_config()

v1 = client.CoreV1Api()

w = watch.Watch()
for event in w.stream(v1.list_pod_for_all_namespaces):
    print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))

However there are many API resources in k8s, and we would want to write a single code that streams all watch events. Enters the Python library for asynchronous programming: asyncio

Using asyncIO for multiple watches

Let say you want to watch for Pods and Deployments changes in the same code. You can write two Watch functions, and launch them as asunchronous tasks in the same event loop with asyncio.

The Python client is great because it is auto-generated using the OpenAPI spec of the k8s API. Each endpoint has its own client class.

  • To access the Pods you instantiate v1=client.CoreV1Api().
  • To access the Deployments you instantiate v1ext=client.ExtensionsV1beta1Api()

Pods will be watched via a call to v1.list_pod_all_namespaces Deployments will be watched via a call to v1ext.list_deployment_for_all_namespaces

The two async watch functions (aka coroutines) will look like this:

import asyncio
import logging

from kubernetes import client, config, watch

logger = logging.getLogger('k8s_events')
logger.setLevel(logging.DEBUG)

config.load_kube_config()

v1 = client.CoreV1Api()
v1ext = client.ExtensionsV1beta1Api()

async def pods():
    w = watch.Watch()
    for event in w.stream(v1.list_pod_for_all_namespaces):
        logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
        await asyncio.sleep(0) 
    
async def deployments():
    w = watch.Watch()
    for event in w.stream(v1ext.list_deployment_for_all_namespaces):
        logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
        await asyncio.sleep(0)

The await call makes each co-routine yield to the other. If an event for a Pod or Deployment is received it will get logged. Then you just need to create the asyncio event loop and schedule the tasks:

ioloop = asyncio.get_event_loop()

ioloop.create_task(pods())
ioloop.create_task(deployments())
ioloop.run_forever()

The result will be something like this:

$ python3 ./local-events.py
step: loop.run_forever()
2017-04-10 16:56:06,705 - k8s_events - INFO - Event: ADDED Pod python-4172325901-9988g
2017-04-10 16:56:06,718 - k8s_events - INFO - Event: ADDED Namespace kube-system
2017-04-10 16:56:06,732 - k8s_events - INFO - Event: ADDED Service k8s-events
2017-04-10 16:56:06,748 - k8s_events - INFO - Event: ADDED Deployment kubeless-controller
2017-04-10 16:56:06,780 - k8s_events - INFO - Event: ADDED ReplicaSet events-4218266190

Tying it with a Message Broker

In Kubeless we use Kafka as a message Broker. It is heavily used in enterprise settings and was surprisingly easy to setup with containers and k8s.

Now that I have a Python script to watch all k8s events that I want, I can publish those events onto a message topic in my broker.

I just submitted a PR which does just this, and packages everything in a container. Check it out.

Want to reach the next level in Kubernetes?

Contact us for a Kubernetes Training