Skip to content


PyPI version Build Status Coverage Status Documentation Status Code style: black Downloads

kfx is a python package with the namespace kfx. Currently, it provides the following sub-packages

NOTE this is currently alpha

There will likely to have breaking changes, and feel free to do a feature request

Known issues

  • kfx.vis.vega.vega_web_app and KfpArtifact does not work well together (see example) because of CORs - the web app is hosted inside an iFrame which prevents it from accessing the ml-pipeline-ui API server.
  • kfx.vis.vega.vega_web_app is only supported in the latest kubeflow pipeline UI (as inline is only supported after 0.2.5)


Refer to

Quick start


pip install kfx


Example: Using ContainerOpTransform to configure the internal k8s properties of kubeflow pipelines tasks.

kfx.dsl.ContainerOpTransform is a helper to modify the interal k8s properties (e.g. resources, environment variables, etc) of kubeflow pipeline tasks.

import kfp.components
import kfp.dsl
import kfx.dsl

transforms = (
    .set_resources(cpu="500m", memory=("1G", "4G"))
    .set_env_vars({"ENV": "production"})
    .set_env_var_from_secret("AWS_ACCESS_KEY", secret_name="aws", secret_key="access_key")
    .set_annotations({"": "some-arn"})

def echo(text: str) -> str:
    return text

def pipeline(text: str):
    op1 = echo(text)
    op2 = echo("%s-%s" % text)

    # u can apply the transform on op1 only
    # op1.apply(transforms)

    # or apply on all ops in the pipeline

Example: Using ArtifactLocationHelper and KfpArtifact to determine the uri of your data artifact generated by the kubeflow pipeline task.

kfx.dsl.ArtifactLocationHelper is a helper to modify the kubeflow pipeline task so that you can use kfx.dsl.KfpArtifact to represent the artifact generated inside the task.

import kfp.components
import kfp.dsl
import kfx.dsl

# creates the helper that has the argo configs (tells you how artifacts will be stored)
# see
helper = kfx.dsl.ArtifactLocationHelper(
    scheme="minio", bucket="mlpipeline", key_prefix="artifacts/"

def test_op(
    mlpipeline_ui_metadata: OutputTextFile(str), markdown_data_file: OutputTextFile(str)
    "A test kubeflow pipeline task."

    import json

    import kfx.dsl
    import kfx.vis
    import kfx.vis.vega

    # `KfpArtifact` provides the reference to data artifact created
    # inside this task
    spec = {
        "$schema": "",
        "description": "A simple bar chart",
        "data": {
            "values": [
                {"a": "A", "b": 28},
                {"a": "B", "b": 55},
                {"a": "C", "b": 43},
                {"a": "D", "b": 91},
                {"a": "E", "b": 81},
                {"a": "F", "b": 53},
                {"a": "G", "b": 19},
                {"a": "H", "b": 87},
                {"a": "I", "b": 52},
        "mark": "bar",
        "encoding": {
            "x": {"field": "a", "type": "ordinal"},
            "y": {"field": "b", "type": "quantitative"},

    # write the markdown to the `markdown-data` artifact
    markdown_data_file.write("### hello world")

    # creates an ui metadata object
    ui_metadata = kfx.vis.kfp_ui_metadata(
        # Describes the vis to generate in the kubeflow pipeline UI.
            # markdown vis from a markdown artifact.
            # `KfpArtifact` provides the reference to data artifact created
            # inside this task
            # a vega web app from the vega data artifact.

    # writes the ui metadata object as the `mlpipeline-ui-metadata` artifact

    # prints the uri to the markdown artifact

def test_pipeline():
    "A test kubeflow pipeline"

    op: kfp.dsl.ContainerOp = test_op()

    # modify kfp operator with artifact location metadata through env vars

Example: Using pydantic data models to generate mlpipeline-metrics.json and mlpipeline-ui-metadata.json.

(See also and

kfx.vis has helper functions (with corresponding hints) to describe and create mlpipeline-metrics.json and mlpipeline-ui-metadata.json files (required by kubeflow pipeline UI to render any metrics or visualizations).

import functools

import kfp.components

# install kfx
kfx_component = functools.partial(kfp.components.func_to_container_op, packages_to_install=["kfx"])

def some_op(
    # mlpipeline_metrics is a path - i.e. open(mlpipeline_metrics, "w")
    mlpipeline_metrics: kfp.components.OutputPath(str),
    # mlpipeline_ui_metadata is a FileLike obj - i.e. mlpipeline_ui_metadata.write("something")
    mlpipeline_ui_metadata: kfp.components.OutputTextFile(str),
    "kfp operator that provides metrics and metadata for visualizations."

    # import inside kfp task
    import kfx.vis

    # output metrics to mlpipeline_metrics path
        # render as percent
        kfx.vis.kfp_metric("recall-score", 0.9, percent=true),
        # override metric format with custom value
        kfx.vis.kfp_metric(name="percision-score", value=0.8, metric_format="PERCENTAGE"),
        # render raw score
        kfx.vis.kfp_metric("raw-score", 123.45),

    # output visualization metadata to mlpipeline_ui_metadata obj
            # creates a confusion matrix vis
                labels=["True", "False"],
            # creates a markdown with inline source
                "# Inline Markdown: [A link](",
            # creates a markdown with a remote source
            # creates a ROC curve with a remote source
            # creates a Table with a remote source
                header=["col1", "col2"],
            # creates a tensorboard viewer
            # creates a custom web app from a remote html file
            # creates a Vega-Lite vis as a web app
                "$schema": "",
                "description": "A simple bar chart with embedded data.",
                "data": {
                    "values": [
                        {"a": "A", "b": 28}, {"a": "B", "b": 55}, {"a": "C", "b": 43},
                        {"a": "D", "b": 91}, {"a": "E", "b": 81}, {"a": "F", "b": 53},
                        {"a": "G", "b": 19}, {"a": "H", "b": 87}, {"a": "I", "b": 52}
                "mark": "bar",
                "encoding": {
                    "x": {"field": "a", "type": "ordinal"},
                    "y": {"field": "b", "type": "quantitative"}

Developer guide

This project used:

  • isort: to manage import order
  • pylint: to manage general coding best practices
  • flake8: to manage code complexity and coding best practices
  • black: to manage formats and styles
  • pydocstyle: to manage docstr style/format
  • pytest/coverage: to manage unit tests and code coverage
  • bandit: to find common security issues
  • pyenv: to manage dev env: python version (3.6)
  • pipenv: to manage dev env: python packages

Convention for unit tests are to suffix with _test and colocate with the actual python module - i.e. <module_name>

The version of the package is read from version.txt - i.e. please update the appropriate semantic version (major -> breaking changes, minor -> new features, patch -> bug fix, postfix -> pre-release/post-release).


# autoformat codes with docformatter, isort, and black
make format

# check style, formats, and code complexity
make check

# check style, formats, code complexity, and run unit tests
make test

# test everything including building the package and check the sdist
make test-all

# run unit test only
make test-only

# generate and update the requirements.txt and requirements-dev.txt
make requirements

# generate the docs with sphinx and autoapi extension
make docs

# generate distributions
make dists

# publish to pypi with twine (twine must be configured)
make publish

Last update: June 9, 2021