Streamlining Encoder evaluation using Dask & Python with MainConcept Codecs
Dask offers great capabilities to run parallelized encodings on any kind of compute cluster. If you are interested in codec evaluation with a single clean Python script, Dask will be your perfect companion. We show how to set up a simple, configurable and scalable codec evaluation infrastructure, and further provide a minimal working example as a starting point for your own experiments.
Introduction
When it comes to codec evaluation and/or codec comparisons, running many encodings with different sets of parameters is crucial. Typically, this effort requires some dedicated compute infrastructure featuring multiple processing nodes, to maximize the throughput of the simulations. Conventional compute clusters are often composed of nodes interconnected via a low-latency and high-throughput network connection and fast, shared storage typically accessed via NFS or CIFS. Software-wise, these sorts of clusters are typically powered by job scheduling systems like Slurm, LSF, OGE/SGE, or HTCondor to name a few. Additionally, the software required for the simulations also needs to be synchronized across all of the nodes in the cluster. By concept, these "super computers" require enterprise-grade hardware and quite a bit of manual system administration. With the rise of containerized workloads and container orchestrators such as Kubernetes in recent years, a comparable infrastructure can be built at low cost and maintenance effort.
Why Dask?
Running compute intensive workloads on Kubernetes, such as machine learning or video compression algorithms, on a large scale is a very common use case nowadays. There are many different software stacks available, and each has its pros and cons and solves specific problems. Quite often, a machine learning or video encoding pipeline is in fact a complex workflow (or even Directed Acyclic Graphs) involving the execution of multiple subsequent or dependent steps (e.g. dataset preparation, training, testing, deployment or video analysis, encoding, quality control, delivery). In such cases, Python-centric workflow orchestration frameworks such as Airflow or Prefect are prime examples that solve this need, as they also offer additional scheduling (e.g. cron) capabilities, resilience (e.g. configurable retry mechanisms) and observability (dashboards, logging, etc.). Dask, on the other hand, is primarily a parallel computing library that scales Python code from a single machine to a cluster. We found that this is the most lightweight and flexible approach for our R&D purposes. The figure below depicts the infrastructure setup. As you can see, all simulation code can be written on a development machine and scaled out to a cluster.
In this article, we show how to build a codec evaluation infrastructure on a local KinD Kubernetes cluster. Of course, this setup can be easily transferred to any Kubernetes cluster running in the cloud or on-premises. Thus, the concept generalizes well for various use cases. The article will focus on two main topics:
- How a local Kubernetes cluster can be set up for serving Dask clusters using KinD.
- Demonstrate how to run a simple codec comparison between x265 and the MainConcept HEVC/H.265 encoder with Dask, also sharing the results.
Setup KinD Kubernetes cluster
Throughout this article, we assume that all code snippets are running on a Linux host. As your system configuration might differ from ours, not every command is guaranteed to succeed. If you run into trouble, do not hesitate to reach out, we would be happy to help troubleshoot with you. You will find our email addresses at the end of the article.
To setup a KinD Kubernetes cluster, we need a cluster configuration file. This is required as we need to mount a directory containing our test video sequences to the worker nodes in the cluster. With KinD installed, the following snippet will provide you with a locally running k8s cluster and mount the directory <PATH_TO_YOUR_VIDEO_SEQUENCES> to /sequences on the worker node.
cat << EOF | kind create cluster --config -
apiVersion: kind.x-k8s.io/v1alpha4
kind: Cluster
name: codec-eval-cluster
nodes:
- role: control-plane
image: kindest/node:v1.29.4@sha256:3abb816a5b1061fb15c6e9e60856ec40d56b7b52bcea5f5f1350bc6e2320b6f8
- role: worker
image: kindest/node:v1.29.4@sha256:3abb816a5b1061fb15c6e9e60856ec40d56b7b52bcea5f5f1350bc6e2320b6f8
extraMounts:
- hostPath: <PATH_TO_YOUR_VIDEO_SEQUENCES>
containerPath: /sequences
EOF
Now let's get the cluster's KUBECONFIG, store it to our current working directory and export the KUBECONFIG environment variable:
kind get kubeconfig --name codec-eval-cluster > codec-eval-cluster.yaml
export KUBECONFIG=codec-eval-cluster.yaml
As we want to run our encodings using Dask, we will install the Dask Kubernetes operator into the cluster so that we can create Dask compute clusters on-demand.
Let's stop for a second and clarify some terms. You might wonder what these different kinds of clusters refer to:
- The Kubernetes cluster provides a layer of abstraction for the underlying infrastructure, i.e. Linux-based compute nodes. Its entry point is defined by the Kubernetes API and the underlying resources are hidden from the user.
- The Dask Kubernetes operator accesses the Kubernetes API to spawn Dask compute clusters onto the Kubernetes cluster.
- A Dask cluster is another abstraction layer for the underlying workload orchestration/job scheduling software.
Yes, you got that right, Dask can also work with conventional job schedulers such as Slurm or SGE, as a layer that is transparent to the user. However, as we are running on Kubernetes for this example, we will install the Dask Kubernetes operator to dynamically create Dask compute clusters for our simulations. Due to the simplicity of the installation, we chose to install the operator using Helm, which can also be considered a package manager for Kubernetes.
Now we are ready to interact with the Dask Kubernetes operator from our Python code.
helm repo add dask https://helm.dask.org
helm repo update
helm upgrade -i dask-operator dask/dask-kubernetes-operator --create-namespace -n dask-operator
Setting up the simulation
From for-loops to Dask
When evaluating the performance of a specific encoder on a single machine, you would typically set up a for-loop iterating over the parameter space of interest and evaluate the results once all loop cycles have finished. If you want to test various video sequences with different characteristics and also want to vary encoding parameters such as the target bitrate or the preset, the increasing simulation time can easily hold back your productivity. For this reason, replacing the for-loop by distributing the encoding runs over multiple nodes is key to scanning a large parameter space. Dask helps us to do this with less than 100 lines of code, as follows.
First, we need to import some modules. For the sake of conciseness of this post, we will not go through the installation process here.
import os
import subprocess
import json
import time
import itertools
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
from dask.distributed import get_worker
Now we define an encode function that takes some inputs. In our case, this is a simple tuple consisting of the codec we want to evaluate, the performance level, and the target bitrate for the encode. Of course, you can be creative and adjust this function to your needs. It defines your own simulation logic that maps your input parameters to the output you want to measure. In the return statement of our function, we observe that this simulation targets the measurement of the encoding speed in terms of enc_fps, the actual bitrate and the vmaf_score measured with respect to the uncompressed video.
def encode(inputs):
# change to the working directory of the curent dask worker
# this is where we uploaded our binaries to
try:
worker = get_worker()
os.chdir(worker.local_directory)
os.chmod("ffmpeg", 0o755)
os.chmod("sample_enc_hevc_static", 0o755)
except ValueError: # get_worker() will throw a ValueError when running locally
pass
codec = inputs[0]
perf_level = inputs[1]
target_bitrate = inputs[2]
sequence = "/sequences/sinteltrailerScene5_1920x1080_24_8_yuv420p.yuv"
width = 1920
height = 1080
frame_rate = 24
num_frames = float(os.path.getsize(sequence)) / width / height / 1.5
if codec == "x265":
enc_cmd = ["./ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-f", "rawvideo", "-r", str(frame_rate), "-s", f"{width}x{height}", "-pix_fmt", "yuv420p", "-i", sequence, "-c:v", "libx265", "-preset", str(perf_level), "-b:v", str(target_bitrate), "-f", "hevc", "out.hevc"]
else:
enc_cmd = ["./sample_enc_hevc_static", "-I420", "-w", str(width), "-h", str(height), "-f", str(frame_rate), "-v", sequence, "-b", str(target_bitrate), "-perf", str(perf_level), "-o", "out.hevc"]
start = time.time()
subprocess.run(enc_cmd)
enc_fps = num_frames / ( time.time() - start)
bitrate = os.path.getsize("out.hevc")*8 * frame_rate / num_frames
quality_cmd = ["./ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-f", "rawvideo", "-r", str(frame_rate), "-s", f"{width}x{height}", "-pix_fmt", "yuv420p", "-i", sequence, "-f", "hevc", "-i", "out.hevc", "-lavfi", "[0:v]setpts=PTS-STARTPTS[reference];[1:v]setpts=PTS-STARTPTS[distorted];[distorted][reference]libvmaf=log_fmt=json:log_path=vmaf.json:model=version=vmaf_v0.6.1neg", "-f", "null", "-"]
subprocess.run(quality_cmd)
with open("vmaf.json", "r") as f:
vmaf_result = json.load(f)
vmaf_score = vmaf_result["pooled_metrics"]["vmaf"]["mean"]
return enc_fps, bitrate, vmaf_score
As we have defined our simulation logic, we are ready to create a Dask compute cluster on our k8s cluster (see above). If you are familiar with Kubernetes, the cluster spec will be easily accessible for you. Conceptually, you only need to provide a podSpec in spec.worker. In this example, you can observe that we mount a sequences directory into our worker as a hostPath volume. This is required to access the sequences we want to encode within our worker pod. Further, we upload the binary encoders to all workers in the cluster so that we can easily call them in our simulation routine. As you can see, we upload ffmpeg built with x265 and the MainConcept HEVC encoder sample in order to compare these two software-based HEVC encoder implementations.
# We need to set the KUBECONFIG to our cluster
# Of course you could also export it from outside
os.environ["KUBECONFIG"] = "codec-eval-cluster.yaml"
# Define a spec for our Dask compute cluster
spec = make_cluster_spec(name="codec-eval", image="ghcr.io/dask/dask:2024.6.0-py3.11")
spec["spec"]["worker"]["replicas"] = 1
spec["spec"]["worker"]["spec"]["volumes"] = [
{"name": "sequences", "hostPath": {"path": "/sequences"}},
]
spec["spec"]["worker"]["spec"]["containers"][0]["volumeMounts"] = [
{"mountPath": "/sequences", "name": "sequences"},
]
spec["spec"]["worker"]["spec"]["containers"][0]["args"] += [
"--nworkers",
"1",
"--nthreads",
"1",
spec["spec"]["worker"]["spec"]["containers"][0]["resources"] = {
"requests": {"memory": "4G", "cpu": "4"},
"limits": {"memory": "4G", "cpu": "4"},
}
# Create the cluster and upload the binaries we need for evaluation
cluster = KubeCluster(custom_cluster_spec=spec)
client = cluster.get_client()
client.upload_file("ffmpeg", load=False)
client.upload_file("sample_enc_hevc_static", load=False)
With the running Dask cluster, we can submit our simulation and evaluate the results. The snippet below demonstrates how this could be approached. Of course, there are different ways to handle this, and the example should only be treated as a starting point for your own codec evaluation.
# Define parameters we want to simulate
parameters = list(itertools.product(*[["mainConcept"], list(range(7,31,2)), [500000, 1000000, 2000000, 4000000]]))
parameters += list(itertools.product(*[["x265"], list(range(0,10)), [500000, 1000000, 2000000, 4000000]]))
# Start the computation on the cluster
futures = client.map(encode, parameters)
results = client.gather(futures)
# construct a Pandas DataFrame for the results
df = pd.DataFrame(results, index=pd.MultiIndex.from_tuples(parameters, names=["codec", "perf_level", "target_bitrate"]), columns=["enc_fps", "bitrate", "vmaf"])
# plot the results with searborn
sns.lineplot(df, x="enc_fps", y="vmaf", hue="target_bitrate", style="codec", palette="colorblind")
plt.grid()
plt.show(block=False)
A look at the results
The figure below shows the results generated via the simulation logic from above.
As the results of this test show, the MainConcept HEVC/H.265 Encoder outperforms x265. For all target bitrates and all encoding speeds, the quality of the encoded video is higher for the MainConcept codec in terms of VMAF score. Also note that the measurements were conducted in a KinD cluster running on a Linux virtual machine on a laptop. The absolute encoding speeds are limited due to this setup and higher speeds are expected to be measured on more powerful hardware.
Conclusion
Dask in combination with Kubernetes offers great capabilities for efficient codec evaluation. It excels in running parallelized encodings on any kind of compute cluster. As demonstrated by analyzing MainConcept HEVC and x265, the results are clear and actionable. That said, there are a considerable number of further applications, such as optimization of encoder parameters for certain types of content or training data generation for machine learning tasks in the field of video coding. However, these topics go beyond the scope of this article. Instead, we will tackle them in subsequent articles, stay tuned. In the meantime, reach out to us if you have any questions or comments: jens.schneider@mainconcept.com and max.blaeser@mainconcept.com.
Jens Schneider & Max Bläser
Senior Video Coding Research and Development EngineersMax focuses on applying machine learning algorithms to encoders and video applications. He previously worked for Germany’s largest private streaming and broadcast company where he co-developed large-scale transcoding systems for VOD streaming. He received the Dipl.-Ing. in electrical and communications engineering and Dr.-Ing. degrees from RWTH Aachen University. During his doctoral studies at the Institut für Nachrichtentechnik (IENT), Max performed research in the area of video coding and actively contributed to the standardization of Versatile Video Coding (VVC).
Jens received his Dr.-Ing. degree in communications engineering from RWTH Aachen University in 2021. His research focused on the link between machine learning and low-level coding tools in combination with higher level video coding concepts such as dynamic resolution coding. Jens joined MainConcept after working as a Software Engineer in the cloud native landscape. In his role at MainConcept, he is currently working on machine learning-based encoder optimizations and cloud native simulation setups.