Skip to content

Commit

Permalink
Merge pull request #32 from moleculemaker/lambert8/upgrade-kubernetes…
Browse files Browse the repository at this point in the history
…-client

feat: upgrade kubernetes client from 11.0.0 -> 24.2.0, implement List+Watch in KubeWatcher
  • Loading branch information
bodom0015 authored Jan 8, 2024
2 parents 37e12df + b3c384c commit def1b47
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
5 changes: 3 additions & 2 deletions chart/values.prod.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ingress:
hostname: jobmgr.mmli1.ncsa.illinois.edu
hostname: jobmgr.platform.moleculemaker.org
tls: true
annotations:
cert-manager.io/cluster-issuer: letsencrypt-production
Expand Down Expand Up @@ -117,7 +117,8 @@ config:
server:
protocol: "https"
## API hostname. Must match the ingress.hostname value.
hostName: "jobmgr.mmli1.ncsa.illinois.edu"
## Suffix must match CLEAN + MOLLI prod for user auth to work
hostName: "jobmgr.platform.moleculemaker.org"
namespace: "alphasynthesis"
oauth:
userInfoUrl: "http://oauth2-proxy.oauth2-proxy.svc.cluster.local/oauth2/userinfo"
Expand Down
5 changes: 2 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# kubernetes v12.0.0 breaks the kubejob.py code
kubernetes == 11.0.0
kubernetes == 24.2.0
tornado == 6.1
pyjson == 1.3.0
pyyaml == 5.4.1
jinja2 == 3.0.1
requests == 2.26
requests
bcrypt == 3.2.0
pyjwt == 1.7.1
mysql-connector-python-rf == 2.2.2
Expand Down
22 changes: 17 additions & 5 deletions src/kubejob.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

import global_vars
from global_vars import config, log
from kubernetes import client, config as kubeconfig
Expand All @@ -13,12 +15,22 @@

## Load Kubernetes cluster config. Unhandled exception if not in Kubernetes environment.
try:
kubeconfig.load_kube_config(config_file=config['server']['kubeconfig'])
except:
kubeconfig.load_incluster_config()
configuration = client.Configuration()
api_batch_v1 = client.BatchV1Api(client.ApiClient(configuration))
api_v1 = client.CoreV1Api(client.ApiClient(configuration))
log.info('Successfully loaded in-cluster config!')
except Exception as e1:
log.error('In-cluster config failed: ', e1)
config_file_path = config['server']['kubeconfig']
log.info('Falling back to provided kubeconfig path: ', config_file_path)
try:
kubeconfig.load_kube_config(config_file=config_file_path)
except Exception as e2:
log.fatal('Failed to get any cluster config: ', e2)
sys.exit(1)
#configuration = client.Configuration()
#api_batch_v1 = client.BatchV1Api(client.ApiClient(configuration))
#api_v1 = client.CoreV1Api(client.ApiClient(configuration))
api_batch_v1 = client.BatchV1Api()
api_v1 = client.CoreV1Api()

def get_namespace():
# When running in a pod, the namespace should be determined automatically,
Expand Down
15 changes: 11 additions & 4 deletions src/kubewatcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import time
import threading

from kubernetes import watch, config as kubeconfig
from kubernetes.client import V1JobList
from kubernetes.client.rest import ApiException
from requests import HTTPError

Expand Down Expand Up @@ -70,11 +72,16 @@ def run(self):
time.sleep(1)
self.logger.info('KubeWatcher is connecting...')
try:
# Resource version is used to keep track of stream progress (in case of resume)
# List all pods in watched namespace to get resource_version
namespaced_jobs: V1JobList = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace())
resource_version = namespaced_jobs.metadata.resource_version if namespaced_jobs.metadata.resource_version else resource_version

# Then, watch for new events using the most recent resource_version
# Resource version is used to keep track of stream progress (in case of resume/retry)
k8s_event_stream = w.stream(func=kubejob.api_batch_v1.list_namespaced_job,
namespace=kubejob.get_namespace(),
timeout_seconds=timeout_seconds,
resource_version=resource_version)
resource_version=resource_version,
timeout_seconds=timeout_seconds)

self.logger.info('KubeWatcher connected!')

Expand Down Expand Up @@ -147,7 +154,7 @@ def run(self):
k8s_event_stream = None
if e.status == 410:
# Resource too old
resource_version = None
resource_version = ''
self.logger.warning("Resource too old (410) - reconnecting: " + str(e))
time.sleep(2)
continue
Expand Down

0 comments on commit def1b47

Please sign in to comment.