Recently, I have been busy developing kubeless a serverless framework on top of Kubernetes. It is an exciting twist in my recent adventures as I am now getting back to writing applications and prototyping interesting data processing pipelines. I will blog more about Kubeless soon, for now you can watch my Kubecon talk.
Some feedback that I got was about the ability to use Kubernetes events as triggers of functions. It makes total sense if you are trying to write Kubernetes operators aka controllers that do things when something happens within your cluster.
Some will argue that Kubernetes does not have proper event management, but each API resource has a Watch endpoint. Meaning, we can get a stream of changes to every API object. For example, you can watch changes to a Pod with:
Doing a Watch with Python
With the incubating Kubernetes Python client coding a watch is quite quick:
from kubernetes import client, config, watch config.load_kube_config() v1 = client.CoreV1Api() w = watch.Watch() for event in w.stream(v1.list_pod_for_all_namespaces): print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
However there are many API resources in k8s, and we would want to write a single code that streams all watch events. Enters the Python library for asynchronous programming: asyncio
Using asyncIO for multiple watches
Let say you want to watch for Pods and Deployments changes in the same code. You can write two Watch functions, and launch them as asunchronous tasks in the same event loop with asyncio.
The Python client is great because it is auto-generated using the OpenAPI spec of the k8s API. Each endpoint has its own client class.
- To access the Pods you instantiate
- To access the Deployments you instantiate
Pods will be watched via a call to
Deployments will be watched via a call to
The two async watch functions (aka coroutines) will look like this:
import asyncio import logging from kubernetes import client, config, watch logger = logging.getLogger('k8s_events') logger.setLevel(logging.DEBUG) config.load_kube_config() v1 = client.CoreV1Api() v1ext = client.ExtensionsV1beta1Api() async def pods(): w = watch.Watch() for event in w.stream(v1.list_pod_for_all_namespaces): logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name)) await asyncio.sleep(0) async def deployments(): w = watch.Watch() for event in w.stream(v1ext.list_deployment_for_all_namespaces): logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name)) await asyncio.sleep(0)
await call makes each co-routine yield to the other. If an event for a Pod or Deployment is received it will get logged. Then you just need to create the
asyncio event loop and schedule the tasks:
ioloop = asyncio.get_event_loop() ioloop.create_task(pods()) ioloop.create_task(deployments()) ioloop.run_forever()
The result will be something like this:
$ python3 ./local-events.py step: loop.run_forever() 2017-04-10 16:56:06,705 - k8s_events - INFO - Event: ADDED Pod python-4172325901-9988g 2017-04-10 16:56:06,718 - k8s_events - INFO - Event: ADDED Namespace kube-system 2017-04-10 16:56:06,732 - k8s_events - INFO - Event: ADDED Service k8s-events 2017-04-10 16:56:06,748 - k8s_events - INFO - Event: ADDED Deployment kubeless-controller 2017-04-10 16:56:06,780 - k8s_events - INFO - Event: ADDED ReplicaSet events-4218266190
Tying it with a Message Broker
In Kubeless we use Kafka as a message Broker. It is heavily used in enterprise settings and was surprisingly easy to setup with containers and k8s.
Now that I have a Python script to watch all k8s events that I want, I can publish those events onto a message topic in my broker.
I just submitted a PR which does just this, and packages everything in a container. Check it out.
Want to reach the next level in Kubernetes?