Python k8s API

Kubernetes offers API clients that allow you to build integrations under-the-hood of applications. The following is an example Python client for Nautilus that can submit it's own jobs and all the other functions of kubectl The config mounts the Ceph Shared FileSystem as well as the node's local scratch space. Be sure to configure resources appropriately to your application.

import os
from kubernetes import client, config, utils
from kubernetes.client.rest import ApiException
from skimage.transform import resize
from uuid import uuid4
import time
from pathlib import Path

def test():
    try:
        config.load_kube_config()
        v1 = client.CoreV1Api()
        ret = v1.list_namespaced_pod('ncmir-mm')
    except:
        ValueError('Could not connect to kube cluster')

class Constants(object):
    NAMESPACE = YOUR_NAMESPACE

class KubernetesApiClient(object):
    def __init__(self):
        # load 
        print("\n Loading Nautilus Client... \n")
    def create_batch_api_client(self):
        return client.BatchV1Api(client.ApiClient())

    def create_job_object(self, job_name, container_image, args=[],cmd = ['/bin/bash'], 
                          min_cpu=1, min_ram = 4, max_cpu=2, max_ram=12):
            res = client.V1ResourceRequirements(
                requests={"cpu":"1","memory":"8Gi","ephemeral-storage": "4Gi"},
                limits = {"cpu":"4","memory":"24Gi","ephemeral-storage": "16Gi"})
            volume_mount_2 = client.V1VolumeMount(
                mount_path='/ceph',
                name='ceph'
            )
            volume_mount_1 = client.V1VolumeMount(
                mount_path='/mnt/data',
                name='data'
            )
            #env = client.V1EnvVar(name='GOOGLE_APPLICATION_CREDENTIALS',value=google_app_credentials_path)
            container = client.V1Container(
                name=job_name,
                command = cmd,
                image=container_image,
                args=args,
                volume_mounts=[volume_mount_1,volume_mount_2],
                env=[],
                image_pull_policy="Always",
                resources = res)
            volume_1 = client.V1Volume(
                name='data'
            )

            flex_2 = client.V1FlexVolumeSource(
                driver='ceph.rook.io/rook', 
                fs_type='ceph', 
                options = {'fsName': 'nautilusfs',
                            'clusterNamespace': 'rook',
                            'path': 'YOUR_CEPHFS_MOUNT',
                            'mountUser': 'YOUR_NAMESPACE',
                            'mountSecret': 'YOUR_CEPHFS_SECRET'}
            )
            volume_2=client.V1Volume(
                name = 'ceph',
                flex_volume=flex_2
            )

            template = client.V1PodTemplateSpec(
                        metadata=client.V1ObjectMeta(labels={"app": "sample"}),
                        spec=client.V1PodSpec(restart_policy="Never", 
                                              containers=[container],
                                              volumes=[volume_1,volume_2])
            )
            spec = client.V1JobSpec(
                        template=template,
                        backoff_limit=6,
                        ttl_seconds_after_finished=60)
            job = client.V1Job(
                        api_version="batch/v1",
                        kind="Job",
                        metadata=client.V1ObjectMeta(name=job_name),
                        spec=spec)
            return job

def submit_job(jobname, image = 'YOUR_IMAGE',args = []):   
    api_client = KubernetesApiClient()
    job_api_client = api_client.create_batch_api_client()
    job = api_client.create_job_object(jobname, image, args)
    try: 
        api_response = job_api_client.create_namespaced_job(
            namespace=Constants.NAMESPACE,
            body=job)
        print(str(api_response.status))
    except ApiException as e:
        print(e) # Handle the exception.
    return job_api_client