diff --git a/backend/Dockerfile b/backend/Dockerfile index 05fab08013b..fc5c775ebea 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -44,6 +44,7 @@ WORKDIR /bin COPY --from=builder /opt/app-root/src/backend/src/apiserver/config/ /config COPY --from=builder /bin/apiserver /bin/apiserver +COPY --from=builder /opt/app-root/src/backend/src/apiserver/ilab_pipeline/pipeline.yaml /pipelines/ RUN chmod +x /bin/apiserver diff --git a/backend/src/apiserver/ilab_pipeline/pipeline.yaml b/backend/src/apiserver/ilab_pipeline/pipeline.yaml new file mode 100644 index 00000000000..23b8999c6ea --- /dev/null +++ b/backend/src/apiserver/ilab_pipeline/pipeline.yaml @@ -0,0 +1,2326 @@ +# PIPELINE DEFINITION +# Name: instructlab +# Description: InstructLab pipeline +# Inputs: +# final_eval_batch_size: str [Default: 'auto'] +# final_eval_few_shots: int [Default: 5.0] +# final_eval_max_workers: str [Default: 'auto'] +# final_eval_merge_system_user_message: bool [Default: False] +# k8s_storage_class_name: str [Default: 'standard'] +# mt_bench_max_workers: str [Default: 'auto'] +# mt_bench_merge_system_user_message: bool [Default: False] +# sdg_base_model: str [Default: 's3:///'] +# sdg_max_batch_len: int [Default: 20000.0] +# sdg_pipeline: str [Default: 'simple'] +# sdg_repo_branch: str +# sdg_repo_pr: int +# sdg_repo_url: str [Default: 'https://github.com/instructlab/taxonomy.git'] +# sdg_sample_size: float [Default: 1.0] +# sdg_scale_factor: int [Default: 2.0] +# train_effective_batch_size_phase_1: int [Default: 3840.0] +# train_effective_batch_size_phase_2: int [Default: 3840.0] +# train_learning_rate_phase_1: float [Default: 0.0001] +# train_learning_rate_phase_2: float [Default: 0.0001] +# train_max_batch_len: int [Default: 20000.0] +# train_nnodes: int [Default: 2.0] +# train_nproc_per_node: int [Default: 3.0] +# train_num_epochs_phase_1: int [Default: 2.0] +# train_num_epochs_phase_2: int [Default: 2.0] +# train_num_warmup_steps_phase_1: int [Default: 100.0] +# train_num_warmup_steps_phase_2: int [Default: 100.0] +# train_save_samples: int [Default: 0.0] +# train_seed: int [Default: 42.0] +components: + comp-createpvc: + executorLabel: exec-createpvc + inputDefinitions: + parameters: + access_modes: + description: 'AccessModes to request for the provisioned PVC. May + + be one or more of ``''ReadWriteOnce''``, ``''ReadOnlyMany''``, ``''ReadWriteMany''``, + or + + ``''ReadWriteOncePod''``. Corresponds to `PersistentVolumeClaim.spec.accessModes + `_.' + parameterType: LIST + annotations: + description: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations + `_. + isOptional: true + parameterType: STRUCT + pvc_name: + description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name + `_. + Only one of ``pvc_name`` and ``pvc_name_suffix`` can + + be provided.' + isOptional: true + parameterType: STRING + pvc_name_suffix: + description: 'Prefix to use for a dynamically generated name, which + + will take the form ``-``. Only one + + of ``pvc_name`` and ``pvc_name_suffix`` can be provided.' + isOptional: true + parameterType: STRING + size: + description: The size of storage requested by the PVC that will be provisioned. + For example, ``'5Gi'``. Corresponds to `PersistentVolumeClaim.spec.resources.requests.storage + `_. + parameterType: STRING + storage_class_name: + defaultValue: '' + description: 'Name of StorageClass from which to provision the PV + + to back the PVC. ``None`` indicates to use the cluster''s default + + storage_class_name. Set to ``''''`` for a statically specified PVC.' + isOptional: true + parameterType: STRING + volume_name: + description: 'Pre-existing PersistentVolume that should back the + + provisioned PersistentVolumeClaim. Used for statically + + specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName + `_.' + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + name: + parameterType: STRING + comp-createpvc-2: + executorLabel: exec-createpvc-2 + inputDefinitions: + parameters: + access_modes: + description: 'AccessModes to request for the provisioned PVC. May + + be one or more of ``''ReadWriteOnce''``, ``''ReadOnlyMany''``, ``''ReadWriteMany''``, + or + + ``''ReadWriteOncePod''``. Corresponds to `PersistentVolumeClaim.spec.accessModes + `_.' + parameterType: LIST + annotations: + description: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations + `_. + isOptional: true + parameterType: STRUCT + pvc_name: + description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name + `_. + Only one of ``pvc_name`` and ``pvc_name_suffix`` can + + be provided.' + isOptional: true + parameterType: STRING + pvc_name_suffix: + description: 'Prefix to use for a dynamically generated name, which + + will take the form ``-``. Only one + + of ``pvc_name`` and ``pvc_name_suffix`` can be provided.' + isOptional: true + parameterType: STRING + size: + description: The size of storage requested by the PVC that will be provisioned. + For example, ``'5Gi'``. Corresponds to `PersistentVolumeClaim.spec.resources.requests.storage + `_. + parameterType: STRING + storage_class_name: + defaultValue: '' + description: 'Name of StorageClass from which to provision the PV + + to back the PVC. ``None`` indicates to use the cluster''s default + + storage_class_name. Set to ``''''`` for a statically specified PVC.' + isOptional: true + parameterType: STRING + volume_name: + description: 'Pre-existing PersistentVolume that should back the + + provisioned PersistentVolumeClaim. Used for statically + + specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName + `_.' + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + name: + parameterType: STRING + comp-createpvc-3: + executorLabel: exec-createpvc-3 + inputDefinitions: + parameters: + access_modes: + description: 'AccessModes to request for the provisioned PVC. May + + be one or more of ``''ReadWriteOnce''``, ``''ReadOnlyMany''``, ``''ReadWriteMany''``, + or + + ``''ReadWriteOncePod''``. Corresponds to `PersistentVolumeClaim.spec.accessModes + `_.' + parameterType: LIST + annotations: + description: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations + `_. + isOptional: true + parameterType: STRUCT + pvc_name: + description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name + `_. + Only one of ``pvc_name`` and ``pvc_name_suffix`` can + + be provided.' + isOptional: true + parameterType: STRING + pvc_name_suffix: + description: 'Prefix to use for a dynamically generated name, which + + will take the form ``-``. Only one + + of ``pvc_name`` and ``pvc_name_suffix`` can be provided.' + isOptional: true + parameterType: STRING + size: + description: The size of storage requested by the PVC that will be provisioned. + For example, ``'5Gi'``. Corresponds to `PersistentVolumeClaim.spec.resources.requests.storage + `_. + parameterType: STRING + storage_class_name: + defaultValue: '' + description: 'Name of StorageClass from which to provision the PV + + to back the PVC. ``None`` indicates to use the cluster''s default + + storage_class_name. Set to ``''''`` for a statically specified PVC.' + isOptional: true + parameterType: STRING + volume_name: + description: 'Pre-existing PersistentVolume that should back the + + provisioned PersistentVolumeClaim. Used for statically + + specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName + `_.' + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + name: + parameterType: STRING + comp-data-processing-op: + executorLabel: exec-data-processing-op + inputDefinitions: + parameters: + knowledge_path: + defaultValue: /data/knowledge + isOptional: true + parameterType: STRING + max_batch_len: + defaultValue: 20000.0 + isOptional: true + parameterType: NUMBER_INTEGER + max_seq_len: + defaultValue: 4096.0 + isOptional: true + parameterType: NUMBER_INTEGER + model_path: + defaultValue: /model + isOptional: true + parameterType: STRING + sdg_path: + defaultValue: /data/sdg + isOptional: true + parameterType: STRING + skills_path: + defaultValue: /data/skills + isOptional: true + parameterType: STRING + comp-deletepvc: + executorLabel: exec-deletepvc + inputDefinitions: + parameters: + pvc_name: + description: Name of the PVC to delete. Supports passing a runtime-generated + name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. + parameterType: STRING + comp-deletepvc-2: + executorLabel: exec-deletepvc-2 + inputDefinitions: + parameters: + pvc_name: + description: Name of the PVC to delete. Supports passing a runtime-generated + name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. + parameterType: STRING + comp-deletepvc-3: + executorLabel: exec-deletepvc-3 + inputDefinitions: + parameters: + pvc_name: + description: Name of the PVC to delete. Supports passing a runtime-generated + name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. + parameterType: STRING + comp-git-clone-op: + executorLabel: exec-git-clone-op + inputDefinitions: + parameters: + repo_branch: + parameterType: STRING + repo_pr: + parameterType: NUMBER_INTEGER + repo_url: + parameterType: STRING + taxonomy_path: + defaultValue: /data/taxonomy + isOptional: true + parameterType: STRING + comp-importer: + executorLabel: exec-importer + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + comp-knowledge-processed-data-to-artifact-op: + executorLabel: exec-knowledge-processed-data-to-artifact-op + inputDefinitions: + parameters: + pvc_path: + defaultValue: /data/knowledge + isOptional: true + parameterType: STRING + outputDefinitions: + artifacts: + knowledge_processed_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-model-to-pvc-op: + executorLabel: exec-model-to-pvc-op + inputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + pvc_path: + defaultValue: /model + isOptional: true + parameterType: STRING + comp-pvc-to-model-op: + executorLabel: exec-pvc-to-model-op + inputDefinitions: + parameters: + pvc_path: + parameterType: STRING + outputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + comp-pvc-to-mt-bench-op: + executorLabel: exec-pvc-to-mt-bench-op + inputDefinitions: + parameters: + pvc_path: + parameterType: STRING + outputDefinitions: + artifacts: + mt_bench_output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-pytorchjob-manifest-op: + executorLabel: exec-pytorchjob-manifest-op + inputDefinitions: + parameters: + effective_batch_size: + defaultValue: 3840.0 + isOptional: true + parameterType: NUMBER_INTEGER + input_pvc_name: + parameterType: STRING + learning_rate: + defaultValue: 0.0001 + isOptional: true + parameterType: NUMBER_DOUBLE + max_batch_len: + defaultValue: 20000.0 + isOptional: true + parameterType: NUMBER_INTEGER + model_pvc_name: + parameterType: STRING + name_suffix: + parameterType: STRING + nnodes: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + nproc_per_node: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + num_epochs: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + num_warmup_steps: + defaultValue: 800.0 + isOptional: true + parameterType: NUMBER_INTEGER + output_pvc_name: + parameterType: STRING + phase_num: + parameterType: NUMBER_INTEGER + save_samples: + defaultValue: 0.0 + isOptional: true + parameterType: NUMBER_INTEGER + seed: + defaultValue: 42.0 + isOptional: true + parameterType: NUMBER_INTEGER + comp-pytorchjob-manifest-op-2: + executorLabel: exec-pytorchjob-manifest-op-2 + inputDefinitions: + parameters: + effective_batch_size: + defaultValue: 3840.0 + isOptional: true + parameterType: NUMBER_INTEGER + input_pvc_name: + parameterType: STRING + learning_rate: + defaultValue: 0.0001 + isOptional: true + parameterType: NUMBER_DOUBLE + max_batch_len: + defaultValue: 20000.0 + isOptional: true + parameterType: NUMBER_INTEGER + model_pvc_name: + parameterType: STRING + name_suffix: + parameterType: STRING + nnodes: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + nproc_per_node: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + num_epochs: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + num_warmup_steps: + defaultValue: 800.0 + isOptional: true + parameterType: NUMBER_INTEGER + output_pvc_name: + parameterType: STRING + phase_num: + parameterType: NUMBER_INTEGER + save_samples: + defaultValue: 0.0 + isOptional: true + parameterType: NUMBER_INTEGER + seed: + defaultValue: 42.0 + isOptional: true + parameterType: NUMBER_INTEGER + comp-run-final-eval-op: + executorLabel: exec-run-final-eval-op + inputDefinitions: + parameters: + base_branch: + parameterType: STRING + base_model_dir: + parameterType: STRING + batch_size: + parameterType: STRING + candidate_branch: + parameterType: STRING + candidate_model: + isOptional: true + parameterType: STRING + few_shots: + parameterType: NUMBER_INTEGER + max_workers: + parameterType: STRING + merge_system_user_message: + parameterType: BOOLEAN + sdg_path: + defaultValue: /input/sdg + isOptional: true + parameterType: STRING + taxonomy_path: + defaultValue: /input/taxonomy + isOptional: true + parameterType: STRING + outputDefinitions: + artifacts: + mmlu_branch_output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + mt_bench_branch_output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-run-mt-bench-op: + executorLabel: exec-run-mt-bench-op + inputDefinitions: + parameters: + best_score_file: + isOptional: true + parameterType: STRING + max_workers: + parameterType: STRING + merge_system_user_message: + parameterType: BOOLEAN + models_folder: + parameterType: STRING + output_path: + defaultValue: /output/mt_bench_data.json + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + best_model: + parameterType: STRING + best_score: + parameterType: NUMBER_DOUBLE + comp-sdg-op: + executorLabel: exec-sdg-op + inputDefinitions: + parameters: + num_instructions_to_generate: + parameterType: NUMBER_INTEGER + pipeline: + parameterType: STRING + repo_branch: + parameterType: STRING + repo_pr: + parameterType: NUMBER_INTEGER + sdg_path: + defaultValue: /data/sdg + isOptional: true + parameterType: STRING + sdg_sampling_size: + defaultValue: 1.0 + isOptional: true + parameterType: NUMBER_DOUBLE + taxonomy_path: + defaultValue: /data/taxonomy + isOptional: true + parameterType: STRING + comp-sdg-to-artifact-op: + executorLabel: exec-sdg-to-artifact-op + inputDefinitions: + parameters: + pvc_path: + defaultValue: /data/sdg + isOptional: true + parameterType: STRING + outputDefinitions: + artifacts: + sdg: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-skills-processed-data-to-artifact-op: + executorLabel: exec-skills-processed-data-to-artifact-op + inputDefinitions: + parameters: + pvc_path: + defaultValue: /data/skills + isOptional: true + parameterType: STRING + outputDefinitions: + artifacts: + skills_processed_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-taxonomy-to-artifact-op: + executorLabel: exec-taxonomy-to-artifact-op + inputDefinitions: + parameters: + pvc_path: + defaultValue: /data/taxonomy + isOptional: true + parameterType: STRING + outputDefinitions: + artifacts: + taxonomy: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-createpvc: + container: + image: argostub/createpvc + exec-createpvc-2: + container: + image: argostub/createpvc + exec-createpvc-3: + container: + image: argostub/createpvc + exec-data-processing-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - data_processing_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef data_processing_op(\n model_path: str = \"/model\",\n sdg_path:\ + \ str = \"/data/sdg\",\n skills_path: str = \"/data/skills\",\n knowledge_path:\ + \ str = \"/data/knowledge\",\n max_seq_len: Optional[int] = 4096,\n \ + \ max_batch_len: Optional[int] = 20000,\n):\n import os\n\n import\ + \ instructlab.training.data_process as dp\n from instructlab.training\ + \ import (\n DataProcessArgs,\n TrainingArgs,\n )\n\n \ + \ # define training-specific arguments\n skill_training_args = TrainingArgs(\n\ + \ # define data-specific arguments\n model_path=model_path,\n\ + \ data_path=f\"{sdg_path}/skills_train_msgs*.jsonl\",\n data_output_dir=skills_path,\n\ + \ # define model-trianing parameters\n max_seq_len=max_seq_len,\n\ + \ max_batch_len=max_batch_len,\n # XXX(shanand): We don't\ + \ need the following arguments\n # for data processing. Added them\ + \ for now to avoid\n # Pydantic validation errors for TrainingArgs\n\ + \ ckpt_output_dir=\"data/saved_checkpoints\",\n num_epochs=2,\n\ + \ effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n\ + \ warmup_steps=800,\n is_padding_free=True,\n )\n\n \ + \ knowledge_training_args = TrainingArgs(\n # define data-specific\ + \ arguments\n model_path=model_path,\n data_path=f\"{sdg_path}/knowledge_train_msgs*.jsonl\"\ + ,\n data_output_dir=knowledge_path,\n # define model-trianing\ + \ parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n\ + \ # XXX(shanand): We don't need the following arguments\n \ + \ # for data processing. Added them for now to avoid\n # Pydantic\ + \ validation errors for TrainingArgs\n ckpt_output_dir=\"data/saved_checkpoints\"\ + ,\n num_epochs=2,\n effective_batch_size=3840,\n save_samples=0,\n\ + \ learning_rate=2e-6,\n warmup_steps=800,\n is_padding_free=True,\n\ + \ )\n\n def data_processing(train_args: TrainingArgs) -> None:\n \ + \ # early validation logic here\n if train_args.max_batch_len\ + \ < train_args.max_seq_len:\n raise ValueError(\n \ + \ f\"the 'max_batch_len' cannot be less than 'max_seq_len': {train_args.max_batch_len=}\ + \ < {train_args.max_seq_len=}\"\n )\n\n # process\ + \ the training data\n if not os.path.exists(train_args.data_output_dir):\n\ + \ os.makedirs(train_args.data_output_dir, exist_ok=True)\n \ + \ dp.main(\n DataProcessArgs(\n # XXX(osilkin):\ + \ make a decision here, either:\n # 1. the CLI is fully\ + \ responsible for managing where the data is written\n #\ + \ 2. we never cache it and simply write it to a tmp file every time.\n\ + \ #\n # An important reason for why #1 would\ + \ be preferable is in the case of OpenShift/SELinux\n # where\ + \ the user has a defined place for new temporary data to be written.\n \ + \ data_output_path=train_args.data_output_dir,\n \ + \ model_path=train_args.model_path,\n data_path=train_args.data_path,\n\ + \ max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n\ + \ )\n )\n\n data_processing(train_args=skill_training_args)\n\ + \ data_processing(train_args=knowledge_training_args)\n\n" + image: quay.io/redhat-et/ilab:1.2 + exec-deletepvc: + container: + image: argostub/deletepvc + exec-deletepvc-2: + container: + image: argostub/deletepvc + exec-deletepvc-3: + container: + image: argostub/deletepvc + exec-git-clone-op: + container: + args: + - 'git clone {{$.inputs.parameters[''repo_url'']}} {{$.inputs.parameters[''taxonomy_path'']}} + && cd {{$.inputs.parameters[''taxonomy_path'']}} && if [ -n "{{$.inputs.parameters[''repo_branch'']}}" + ]; then git fetch origin {{$.inputs.parameters[''repo_branch'']}} && git + checkout {{$.inputs.parameters[''repo_branch'']}}; elif [ -n "{{$.inputs.parameters[''repo_pr'']}}" + ] && [ {{$.inputs.parameters[''repo_pr'']}} -gt 0 ]; then git fetch origin + pull/{{$.inputs.parameters[''repo_pr'']}}/head:{{$.inputs.parameters[''repo_pr'']}} + && git checkout {{$.inputs.parameters[''repo_pr'']}}; fi ' + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-importer: + importer: + artifactUri: + runtimeParameter: uri + typeSchema: + schemaTitle: system.Model + schemaVersion: 0.0.1 + exec-knowledge-processed-data-to-artifact-op: + container: + args: + - cp -r {{$.inputs.parameters['pvc_path']}} {{$.outputs.artifacts['knowledge_processed_data'].path}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-model-to-pvc-op: + container: + args: + - cp -r {{$.inputs.artifacts['model'].path}}/* {{$.inputs.parameters['pvc_path']}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-pvc-to-model-op: + container: + args: + - cp -r {{$.inputs.parameters['pvc_path']}} {{$.outputs.artifacts['model'].path}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-pvc-to-mt-bench-op: + container: + args: + - cp -r {{$.inputs.parameters['pvc_path']}} {{$.outputs.artifacts['mt_bench_output'].path}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-pytorchjob-manifest-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - pytorchjob_manifest_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ + \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ + \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ + \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ + \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ + \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ + \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ + \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ + \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ + \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ + {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ + \ )[-1]\n newest_model = models[newest_idx]\n return\ + \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ + \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ + \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ + \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ + Unsupported value of {phase_num=}\")\n\n image = \"quay.io/redhat-et/ilab:1.2\"\ + \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ + \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ + \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ + \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ + \ 1\n restartPolicy: OnFailure\n template:\n \ + \ metadata:\n annotations:\n \ + \ sidecar.istio.io/inject: 'false'\n spec:\n \ + \ containers:\n - args:\n \ + \ - |\n echo \"Running phase {phase_num}\"\ + \n echo \"Using {path_to_model} model for training\"\ + \n echo \"Using {path_to_data} data for training\"\ + \n mkdir -p /output/phase_{phase_num}/model;\n\ + \ mkdir -p /output/data;\n \ + \ torchrun --nnodes {nnodes} \\\n --nproc_per_node\ + \ {nproc_per_node} \\\n --node_rank \\$(RANK)\ + \ \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ + $(MASTER_PORT) \\\n -m instructlab.training.main_ds\ + \ \\\n --model_name_or_path={path_to_model}\ + \ \\\n --data_path={path_to_data} \\\n \ + \ --output_dir=/output/phase_{phase_num}/model\ + \ \\\n --num_epochs={num_epochs} \\\n \ + \ --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n\ + \ --num_warmup_steps={num_warmup_steps} \\\n\ + \ --save_samples={save_samples} \\\n \ + \ --log_level=INFO \\\n \ + \ --max_batch_len={max_batch_len} \\\n \ + \ --seed={seed} \\\n --cpu_offload_optimizer\ + \ \\\n --cpu_offload_params \\\n \ + \ --distributed_training_framework fsdp \\\n \ + \ --is_granite \\\n --checkpoint_at_epoch\n\ + \ command:\n - /bin/bash\n \ + \ - '-c'\n - '--'\n \ + \ image: {image}\n name: pytorch\n \ + \ volumeMounts:\n - mountPath:\ + \ /input_data\n name: input-data\n \ + \ readOnly: true\n - mountPath: /input_model\n\ + \ name: model\n readOnly:\ + \ true\n - mountPath: /output\n \ + \ name: output\n env:\n \ + \ - name: NNODES\n value: \\\"{nnodes}\\\"\n\ + \ - name: NPROC_PER_NODE\n \ + \ value: \\\"{nproc_per_node}\\\"\n - name: XDG_CACHE_HOME\n\ + \ value: /tmp\n - name:\ + \ TRITON_CACHE_DIR\n value: /tmp\n \ + \ - name: HF_HOME\n value: /tmp\n \ + \ - name: TRANSFORMERS_CACHE\n \ + \ value: /tmp\n resources:\n \ + \ requests:\n cpu: 8\n \ + \ \"nvidia.com/gpu\": {nproc_per_node}\n limits:\n\ + \ cpu: 8\n \"nvidia.com/gpu\"\ + : {nproc_per_node}\n volumes:\n - name:\ + \ input-data\n persistentVolumeClaim:\n \ + \ claimName: {input_pvc_name}\n - name: model\n\ + \ persistentVolumeClaim:\n claimName:\ + \ {model_pvc_name}\n - name: output\n \ + \ persistentVolumeClaim:\n claimName: {output_pvc_name}\n\ + \ Worker:\n replicas: {nnodes-1}\n \ + \ restartPolicy: OnFailure\n template:\n metadata:\n\ + \ annotations:\n sidecar.istio.io/inject:\ + \ 'false'\n spec:\n containers:\n \ + \ - args:\n - |\n \ + \ echo \"Running phase {phase_num}\"\n echo\ + \ \"Using {path_to_model} model for training\"\n \ + \ echo \"Using {path_to_data} data for training\"\n \ + \ mkdir -p /tmp/model;\n torchrun --nnodes\ + \ {nnodes} \\\n --nproc_per_node {nproc_per_node}\ + \ \\\n --node_rank \\$(RANK) \\\n \ + \ --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT) \\\n\ + \ -m instructlab.training.main_ds \\\n \ + \ --model_name_or_path={path_to_model} \\\n \ + \ --data_path={path_to_data} \\\n \ + \ --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ + \ \\\n --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n \ + \ --num_warmup_steps={num_warmup_steps} \\\n \ + \ --save_samples={save_samples} \\\n \ + \ --log_level=INFO \\\n --max_batch_len={max_batch_len}\ + \ \\\n --seed={seed} \\\n \ + \ --cpu_offload_optimizer \\\n --cpu_offload_params\ + \ \\\n --distributed_training_framework fsdp\ + \ \\\n --is_granite \\\n \ + \ --checkpoint_at_epoch\n command:\n \ + \ - /bin/bash\n - '-c'\n \ + \ - '--'\n image: {image}\n \ + \ name: pytorch\n volumeMounts:\n \ + \ - mountPath: /input_data\n \ + \ name: input-data\n readOnly: true\n \ + \ - mountPath: /input_model\n name:\ + \ model\n readOnly: true\n \ + \ - mountPath: /output\n name: output\n \ + \ readOnly: true\n env:\n \ + \ - name: NNODES\n value: \\\ + \"{nnodes}\\\"\n - name: NPROC_PER_NODE\n \ + \ value: \\\"{nproc_per_node}\\\"\n \ + \ - name: XDG_CACHE_HOME\n value: /tmp\n \ + \ - name: TRITON_CACHE_DIR\n \ + \ value: /tmp\n - name: HF_HOME\n \ + \ value: /tmp\n - name: TRANSFORMERS_CACHE\n\ + \ value: /tmp\n resources:\n\ + \ requests:\n cpu: 8\n \ + \ \"nvidia.com/gpu\": {nproc_per_node}\n \ + \ limits:\n cpu: 8\n \ + \ \"nvidia.com/gpu\": {nproc_per_node}\n \ + \ volumes:\n - name: input-data\n \ + \ persistentVolumeClaim:\n claimName: {input_pvc_name}\n\ + \ - name: model\n persistentVolumeClaim:\n\ + \ claimName: {model_pvc_name}\n \ + \ - name: output\n persistentVolumeClaim:\n \ + \ claimName: {output_pvc_name}\n \"\"\"\n )\n\ + \n try:\n manifest_yaml = yaml.safe_load(manifest)\n except\ + \ yaml.YAMLError as exc:\n raise RuntimeError(f\"Error parsing manifest:\ + \ {exc}\") from exc\n\n # Discover the namespace in which the pod is\ + \ running\n with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ + , \"r\", encoding=\"utf-8\"\n ) as f:\n namespace = f.read().strip()\n\ + \ print(f\"The pod is running in the namespace: {namespace}\")\n\n\ + \ try:\n kubernetes.config.load_kube_config()\n print(\"\ + Loaded kube config\")\n except kubernetes.config.ConfigException:\n \ + \ print(\"Failed to load kube config. Trying in-cluster config\")\n\ + \ kubernetes.config.load_incluster_config()\n\n api = kubernetes.client.CustomObjectsApi()\n\ + \ try:\n api.create_namespaced_custom_object(\n group=\"\ + kubeflow.org\",\n version=\"v1\",\n namespace=namespace,\n\ + \ plural=\"pytorchjobs\",\n body=manifest_yaml,\n\ + \ )\n except kubernetes.client.rest.ApiException as exc:\n \ + \ if exc.status == 409:\n print(\n \"{} '{}/{}'\ + \ already exists.\".format(\n manifest_yaml[\"kind\"\ + ],\n namespace,\n manifest_yaml[\"\ + metadata\"][\"name\"],\n )\n )\n else:\n\ + \ raise\n\n # Get the CR status and wait for it to be completed\n\ + \ w = kubernetes.watch.Watch()\n exit_flag = False\n start_time\ + \ = time.time()\n timeout_seconds = 24 * 60 * 60 # 24 hours\n\n while\ + \ not exit_flag: # Keep the watch active\n if time.time() - start_time\ + \ > timeout_seconds:\n raise RuntimeError(\n \"\ + Timeout (24h) reached waiting for the PytorchJob to complete.\"\n \ + \ )\n\n try:\n print(\"Watching for PytorchJob\"\ + )\n for event in w.stream(\n api.list_namespaced_custom_object,\n\ + \ group=\"kubeflow.org\",\n version=\"v1\"\ + ,\n namespace=namespace,\n plural=\"pytorchjobs\"\ + ,\n timeout_seconds=60, # Timeout after 1 minute\n \ + \ ):\n pytorchjob_event = event[\"object\"]\n \ + \ if (\n pytorchjob_event[\"metadata\"][\"\ + name\"]\n != manifest_yaml[\"metadata\"][\"name\"]\n\ + \ ):\n continue\n pytorchjob_name\ + \ = pytorchjob_event[\"metadata\"][\"name\"]\n\n if (\n \ + \ \"status\" not in pytorchjob_event\n \ + \ or \"conditions\" not in pytorchjob_event[\"status\"]\n \ + \ ):\n continue\n print(\n \ + \ f\"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions',\ + \ 'No conditions yet')}\"\n )\n for job_condition\ + \ in reversed(pytorchjob_event[\"status\"][\"conditions\"]):\n \ + \ if job_condition[\"type\"] == \"Succeeded\":\n \ + \ print(\n f\"PytorchJob '{pytorchjob_name}'\ + \ completed successfully: {job_condition['reason']}\"\n \ + \ )\n print(f\"Training phase {phase_num}\ + \ completed.\")\n w.stop()\n \ + \ exit_flag = True\n # Break here to avoid going\ + \ into other conditions, we are done\n break\n \ + \ elif job_condition[\"type\"] == \"Failed\":\n \ + \ print(\n f\"PytorchJob '{pytorchjob_name}'\ + \ failed: {job_condition['reason']}\"\n )\n \ + \ w.stop()\n raise RuntimeError(\"\ + Job failed.\")\n except kubernetes.client.exceptions.ApiException\ + \ as e:\n print(f\"API exception occurred: {str(e)}\")\n \ + \ time.sleep(5) # Backoff before retrying\n # Catches the\ + \ following error:\n # urllib3.exceptions.ProtocolError: (\"Connection\ + \ broken: InvalidChunkLength\n except urllib3.exceptions.ProtocolError\ + \ as e:\n print(f\"Connection broken reconnecting the watcher\ + \ {str(e)}\")\n time.sleep(5) # Backoff before retrying\n \ + \ finally:\n w.stop()\n\n" + image: quay.io/modh/odh-generic-data-science-notebook:v3-2024b-20241111 + exec-pytorchjob-manifest-op-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - pytorchjob_manifest_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ + \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ + \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ + \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ + \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ + \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ + \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ + \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ + \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ + \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ + {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ + \ )[-1]\n newest_model = models[newest_idx]\n return\ + \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ + \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ + \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ + \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ + Unsupported value of {phase_num=}\")\n\n image = \"quay.io/redhat-et/ilab:1.2\"\ + \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ + \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ + \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ + \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ + \ 1\n restartPolicy: OnFailure\n template:\n \ + \ metadata:\n annotations:\n \ + \ sidecar.istio.io/inject: 'false'\n spec:\n \ + \ containers:\n - args:\n \ + \ - |\n echo \"Running phase {phase_num}\"\ + \n echo \"Using {path_to_model} model for training\"\ + \n echo \"Using {path_to_data} data for training\"\ + \n mkdir -p /output/phase_{phase_num}/model;\n\ + \ mkdir -p /output/data;\n \ + \ torchrun --nnodes {nnodes} \\\n --nproc_per_node\ + \ {nproc_per_node} \\\n --node_rank \\$(RANK)\ + \ \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ + $(MASTER_PORT) \\\n -m instructlab.training.main_ds\ + \ \\\n --model_name_or_path={path_to_model}\ + \ \\\n --data_path={path_to_data} \\\n \ + \ --output_dir=/output/phase_{phase_num}/model\ + \ \\\n --num_epochs={num_epochs} \\\n \ + \ --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n\ + \ --num_warmup_steps={num_warmup_steps} \\\n\ + \ --save_samples={save_samples} \\\n \ + \ --log_level=INFO \\\n \ + \ --max_batch_len={max_batch_len} \\\n \ + \ --seed={seed} \\\n --cpu_offload_optimizer\ + \ \\\n --cpu_offload_params \\\n \ + \ --distributed_training_framework fsdp \\\n \ + \ --is_granite \\\n --checkpoint_at_epoch\n\ + \ command:\n - /bin/bash\n \ + \ - '-c'\n - '--'\n \ + \ image: {image}\n name: pytorch\n \ + \ volumeMounts:\n - mountPath:\ + \ /input_data\n name: input-data\n \ + \ readOnly: true\n - mountPath: /input_model\n\ + \ name: model\n readOnly:\ + \ true\n - mountPath: /output\n \ + \ name: output\n env:\n \ + \ - name: NNODES\n value: \\\"{nnodes}\\\"\n\ + \ - name: NPROC_PER_NODE\n \ + \ value: \\\"{nproc_per_node}\\\"\n - name: XDG_CACHE_HOME\n\ + \ value: /tmp\n - name:\ + \ TRITON_CACHE_DIR\n value: /tmp\n \ + \ - name: HF_HOME\n value: /tmp\n \ + \ - name: TRANSFORMERS_CACHE\n \ + \ value: /tmp\n resources:\n \ + \ requests:\n cpu: 8\n \ + \ \"nvidia.com/gpu\": {nproc_per_node}\n limits:\n\ + \ cpu: 8\n \"nvidia.com/gpu\"\ + : {nproc_per_node}\n volumes:\n - name:\ + \ input-data\n persistentVolumeClaim:\n \ + \ claimName: {input_pvc_name}\n - name: model\n\ + \ persistentVolumeClaim:\n claimName:\ + \ {model_pvc_name}\n - name: output\n \ + \ persistentVolumeClaim:\n claimName: {output_pvc_name}\n\ + \ Worker:\n replicas: {nnodes-1}\n \ + \ restartPolicy: OnFailure\n template:\n metadata:\n\ + \ annotations:\n sidecar.istio.io/inject:\ + \ 'false'\n spec:\n containers:\n \ + \ - args:\n - |\n \ + \ echo \"Running phase {phase_num}\"\n echo\ + \ \"Using {path_to_model} model for training\"\n \ + \ echo \"Using {path_to_data} data for training\"\n \ + \ mkdir -p /tmp/model;\n torchrun --nnodes\ + \ {nnodes} \\\n --nproc_per_node {nproc_per_node}\ + \ \\\n --node_rank \\$(RANK) \\\n \ + \ --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT) \\\n\ + \ -m instructlab.training.main_ds \\\n \ + \ --model_name_or_path={path_to_model} \\\n \ + \ --data_path={path_to_data} \\\n \ + \ --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ + \ \\\n --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n \ + \ --num_warmup_steps={num_warmup_steps} \\\n \ + \ --save_samples={save_samples} \\\n \ + \ --log_level=INFO \\\n --max_batch_len={max_batch_len}\ + \ \\\n --seed={seed} \\\n \ + \ --cpu_offload_optimizer \\\n --cpu_offload_params\ + \ \\\n --distributed_training_framework fsdp\ + \ \\\n --is_granite \\\n \ + \ --checkpoint_at_epoch\n command:\n \ + \ - /bin/bash\n - '-c'\n \ + \ - '--'\n image: {image}\n \ + \ name: pytorch\n volumeMounts:\n \ + \ - mountPath: /input_data\n \ + \ name: input-data\n readOnly: true\n \ + \ - mountPath: /input_model\n name:\ + \ model\n readOnly: true\n \ + \ - mountPath: /output\n name: output\n \ + \ readOnly: true\n env:\n \ + \ - name: NNODES\n value: \\\ + \"{nnodes}\\\"\n - name: NPROC_PER_NODE\n \ + \ value: \\\"{nproc_per_node}\\\"\n \ + \ - name: XDG_CACHE_HOME\n value: /tmp\n \ + \ - name: TRITON_CACHE_DIR\n \ + \ value: /tmp\n - name: HF_HOME\n \ + \ value: /tmp\n - name: TRANSFORMERS_CACHE\n\ + \ value: /tmp\n resources:\n\ + \ requests:\n cpu: 8\n \ + \ \"nvidia.com/gpu\": {nproc_per_node}\n \ + \ limits:\n cpu: 8\n \ + \ \"nvidia.com/gpu\": {nproc_per_node}\n \ + \ volumes:\n - name: input-data\n \ + \ persistentVolumeClaim:\n claimName: {input_pvc_name}\n\ + \ - name: model\n persistentVolumeClaim:\n\ + \ claimName: {model_pvc_name}\n \ + \ - name: output\n persistentVolumeClaim:\n \ + \ claimName: {output_pvc_name}\n \"\"\"\n )\n\ + \n try:\n manifest_yaml = yaml.safe_load(manifest)\n except\ + \ yaml.YAMLError as exc:\n raise RuntimeError(f\"Error parsing manifest:\ + \ {exc}\") from exc\n\n # Discover the namespace in which the pod is\ + \ running\n with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ + , \"r\", encoding=\"utf-8\"\n ) as f:\n namespace = f.read().strip()\n\ + \ print(f\"The pod is running in the namespace: {namespace}\")\n\n\ + \ try:\n kubernetes.config.load_kube_config()\n print(\"\ + Loaded kube config\")\n except kubernetes.config.ConfigException:\n \ + \ print(\"Failed to load kube config. Trying in-cluster config\")\n\ + \ kubernetes.config.load_incluster_config()\n\n api = kubernetes.client.CustomObjectsApi()\n\ + \ try:\n api.create_namespaced_custom_object(\n group=\"\ + kubeflow.org\",\n version=\"v1\",\n namespace=namespace,\n\ + \ plural=\"pytorchjobs\",\n body=manifest_yaml,\n\ + \ )\n except kubernetes.client.rest.ApiException as exc:\n \ + \ if exc.status == 409:\n print(\n \"{} '{}/{}'\ + \ already exists.\".format(\n manifest_yaml[\"kind\"\ + ],\n namespace,\n manifest_yaml[\"\ + metadata\"][\"name\"],\n )\n )\n else:\n\ + \ raise\n\n # Get the CR status and wait for it to be completed\n\ + \ w = kubernetes.watch.Watch()\n exit_flag = False\n start_time\ + \ = time.time()\n timeout_seconds = 24 * 60 * 60 # 24 hours\n\n while\ + \ not exit_flag: # Keep the watch active\n if time.time() - start_time\ + \ > timeout_seconds:\n raise RuntimeError(\n \"\ + Timeout (24h) reached waiting for the PytorchJob to complete.\"\n \ + \ )\n\n try:\n print(\"Watching for PytorchJob\"\ + )\n for event in w.stream(\n api.list_namespaced_custom_object,\n\ + \ group=\"kubeflow.org\",\n version=\"v1\"\ + ,\n namespace=namespace,\n plural=\"pytorchjobs\"\ + ,\n timeout_seconds=60, # Timeout after 1 minute\n \ + \ ):\n pytorchjob_event = event[\"object\"]\n \ + \ if (\n pytorchjob_event[\"metadata\"][\"\ + name\"]\n != manifest_yaml[\"metadata\"][\"name\"]\n\ + \ ):\n continue\n pytorchjob_name\ + \ = pytorchjob_event[\"metadata\"][\"name\"]\n\n if (\n \ + \ \"status\" not in pytorchjob_event\n \ + \ or \"conditions\" not in pytorchjob_event[\"status\"]\n \ + \ ):\n continue\n print(\n \ + \ f\"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions',\ + \ 'No conditions yet')}\"\n )\n for job_condition\ + \ in reversed(pytorchjob_event[\"status\"][\"conditions\"]):\n \ + \ if job_condition[\"type\"] == \"Succeeded\":\n \ + \ print(\n f\"PytorchJob '{pytorchjob_name}'\ + \ completed successfully: {job_condition['reason']}\"\n \ + \ )\n print(f\"Training phase {phase_num}\ + \ completed.\")\n w.stop()\n \ + \ exit_flag = True\n # Break here to avoid going\ + \ into other conditions, we are done\n break\n \ + \ elif job_condition[\"type\"] == \"Failed\":\n \ + \ print(\n f\"PytorchJob '{pytorchjob_name}'\ + \ failed: {job_condition['reason']}\"\n )\n \ + \ w.stop()\n raise RuntimeError(\"\ + Job failed.\")\n except kubernetes.client.exceptions.ApiException\ + \ as e:\n print(f\"API exception occurred: {str(e)}\")\n \ + \ time.sleep(5) # Backoff before retrying\n # Catches the\ + \ following error:\n # urllib3.exceptions.ProtocolError: (\"Connection\ + \ broken: InvalidChunkLength\n except urllib3.exceptions.ProtocolError\ + \ as e:\n print(f\"Connection broken reconnecting the watcher\ + \ {str(e)}\")\n time.sleep(5) # Backoff before retrying\n \ + \ finally:\n w.stop()\n\n" + image: quay.io/modh/odh-generic-data-science-notebook:v3-2024b-20241111 + exec-run-final-eval-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - run_final_eval_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef run_final_eval_op(\n mmlu_branch_output: Output[Artifact],\n\ + \ mt_bench_branch_output: Output[Artifact],\n base_model_dir: str,\n\ + \ base_branch: str,\n candidate_branch: str,\n max_workers: str,\n\ + \ few_shots: int,\n batch_size: str,\n merge_system_user_message:\ + \ bool,\n candidate_model: str = None,\n taxonomy_path: str = \"/input/taxonomy\"\ + ,\n sdg_path: str = \"/input/sdg\",\n):\n import json\n import\ + \ os\n import subprocess\n\n import torch\n from instructlab.eval.mmlu\ + \ import MMLUBranchEvaluator\n from instructlab.eval.mt_bench import\ + \ MTBenchBranchEvaluator\n from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores,\ + \ sort_score\n\n if judge_ca_cert := os.getenv(\"JUDGE_CA_CERT_PATH\"\ + ):\n import httpx\n import openai\n\n # Create a custom\ + \ HTTP client\n class CustomHttpClient(httpx.Client):\n \ + \ def __init__(self, *args, **kwargs):\n # Use the custom\ + \ CA certificate\n kwargs.setdefault(\"verify\", judge_ca_cert)\n\ + \ super().__init__(*args, **kwargs)\n\n # Create a\ + \ new OpenAI class that uses the custom HTTP client\n class CustomOpenAI(openai.OpenAI):\n\ + \ def __init__(self, *args, **kwargs):\n custom_client\ + \ = CustomHttpClient()\n super().__init__(http_client=custom_client,\ + \ *args, **kwargs)\n\n # Monkey patch the OpenAI class in the openai\ + \ module, so that the eval lib can use it\n openai.OpenAI = CustomOpenAI\n\ + \n print(\"Starting Final Eval...\")\n\n def launch_vllm(\n \ + \ model_path: str, gpu_count: int, retries: int = 120, delay: int = 10\n\ + \ ) -> tuple:\n import subprocess\n import sys\n \ + \ import time\n\n import requests\n from instructlab.model.backends.common\ + \ import free_tcp_ipv4_port\n\n free_port = free_tcp_ipv4_port(\"\ + 127.0.0.1\")\n port = str(free_port)\n vllm_server = f\"http://127.0.0.1:{port}/v1\"\ + \n\n command = [\n sys.executable,\n \"-m\"\ + ,\n \"vllm.entrypoints.openai.api_server\",\n \"--port\"\ + ,\n port,\n \"--model\",\n model_path,\n\ + \ ]\n if gpu_count > 0:\n command += [\n \ + \ \"--tensor-parallel-size\",\n str(gpu_count),\n\ + \ ]\n\n process = subprocess.Popen(args=command)\n\n \ + \ print(f\"Waiting for vLLM server to start at {vllm_server}...\"\ + )\n\n for attempt in range(retries):\n try:\n \ + \ response = requests.get(f\"{vllm_server}/models\", timeout=10)\n\ + \ if response.status_code == 200:\n print(f\"\ + vLLM server is up and running at {vllm_server}.\")\n \ + \ return process, vllm_server\n except requests.ConnectionError:\n\ + \ pass\n\n print(\n f\"Server not\ + \ available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...\"\ + \n )\n time.sleep(delay)\n\n raise RuntimeError(\n\ + \ f\"Failed to start vLLM server at {vllm_server} after {retries}\ + \ retries.\"\n )\n\n def shutdown_vllm(process: subprocess.Popen,\ + \ timeout: int = 20):\n import subprocess\n\n from instructlab.model.backends.vllm\ + \ import wait_for_stable_vram\n\n try:\n process.terminate()\n\ + \ process.wait(timeout=timeout)\n\n if process.poll()\ + \ is None:\n print(f\"Forcefully killing vLLM server process\ + \ with PID: {process.pid}\")\n process.kill()\n\n \ + \ print(f\"Successfully stopped vLLM server with PID: {process.pid}\"\ + )\n\n except subprocess.TimeoutExpired:\n print(\n \ + \ f\"Timeout expired. Forcefully killing vLLM server with PID:\ + \ {process.pid}\"\n )\n process.kill() # Force kill\ + \ the process if over timeout\n\n # Note from instructlab/model/backends/vllm.py\n\ + \ # vLLM relies on stable VRAM, residual reclamation activity\n\ + \ # can lead to crashes on restart. To prevent this add a\n \ + \ # short delay (typically ~ 10 seconds, max 30) to verify stability.\n\ + \ wait_for_stable_vram(30)\n\n # For standalone mode\n if candidate_model\ + \ is None:\n # logic to get the best model from the models folder\ + \ and results\n pass\n\n ######################################################################\n\ + \ # branch_eval_summary_to_json creates a json object from output of\ + \ instructlab/eval\n # TODO: Add this to the instructlab/eval or instructlab/instructlab\ + \ repository\n def branch_eval_summary_to_json(\n improvements:\ + \ list[tuple[str, float, float, float]],\n regressions: list[tuple[str,\ + \ float, float, float]],\n no_changes: list[tuple[str, float]],\n\ + \ new=None,\n ) -> str:\n # Generates a JSON object from\ + \ the _branch benchmark evaluations\n\n import json\n\n summary\ + \ = {\"improvements\": [], \"regressions\": [], \"no_changes\": [], \"new\"\ + : []}\n\n if len(improvements) > 0:\n improvements.sort(key=sort_score,\ + \ reverse=True)\n for improvement in improvements:\n \ + \ task, delta, base_score, new_score = improvement\n \ + \ summary[\"improvements\"].append(\n {\n \ + \ \"task\": task,\n \"base_score\"\ + : round(base_score, 2),\n \"new_score\": round(new_score,\ + \ 2),\n \"delta\": delta,\n }\n\ + \ )\n\n if len(regressions) > 0:\n regressions.sort(key=sort_score)\n\ + \ for regression in regressions:\n task, delta,\ + \ base_score, new_score = regression\n summary[\"regressions\"\ + ].append(\n {\n \"task\": task,\n\ + \ \"base_score\": round(base_score, 2),\n \ + \ \"new_score\": round(new_score, 2),\n \ + \ \"delta\": delta,\n }\n )\n\n\ + \ if len(no_changes) > 0:\n for entry in no_changes:\n\ + \ task, avg_score = entry\n summary[\"no_changes\"\ + ].append(\n {\"task\": task, \"average_score\": round(avg_score,\ + \ 2)}\n )\n\n if new is not None and len(new) > 0:\n\ + \ for entry in new:\n _, avg_score = entry\n \ + \ summary[\"new\"].append(\n {\"qna\":\ + \ qna, \"average_score\": round(avg_score, 2)}\n )\n\n \ + \ return json.dumps(summary, indent=4)\n\n ######################################################################\n\ + \ print(\"Checking GPUs...\")\n gpu_available = torch.cuda.is_available()\n\ + \ gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n\ + \ if gpu_available\n else \"No GPU available\"\n )\n \ + \ gpu_count = torch.cuda.device_count() if gpu_available else 0\n\n \ + \ print(f\"GPU Available: {gpu_available}, Using: {gpu_name}\")\n\n if\ + \ batch_size.isdigit():\n batch_size = int(batch_size)\n\n # MMLU_BRANCH\n\ + \n # This is very specific to 'ilab generate', necessary because the\ + \ data generation and\n # model evaluation are taking place in separate\ + \ environments.\n def update_test_lines_in_files(base_dir):\n \ + \ import os\n\n import yaml\n\n for root, _, files in os.walk(base_dir):\n\ + \ for file_name in files:\n if file_name.startswith(\"\ + knowledge_\") and file_name.endswith(\n \"_task.yaml\"\ + \n ):\n file_path = os.path.join(root,\ + \ file_name)\n\n with open(file_path, \"r\", encoding=\"\ + utf-8\") as file:\n task_yaml = yaml.load(file, Loader=yaml.Loader)\n\ + \n current_test_file_path = task_yaml[\"dataset_kwargs\"\ + ][\"data_files\"][\n \"test\"\n \ + \ ]\n current_test_file_path_parts = current_test_file_path.split(\"\ + /\")\n new_test_file_path = f\"{root}/{current_test_file_path_parts[-1]}\"\ + \n task_yaml[\"dataset_kwargs\"][\"data_files\"][\"test\"\ + ] = (\n new_test_file_path\n )\n\ + \ with open(file_path, \"w\", encoding=\"utf-8\") as\ + \ file:\n yaml.dump(task_yaml, file)\n\n # find_node_dataset_directories\ + \ to find sdg output node_datasets_*\n def find_node_dataset_directories(base_dir:\ + \ str):\n import os\n import re\n\n # This is specific\ + \ to ilab/eval output\n pattern = r\"node_datasets_\"\n matching_dirs\ + \ = []\n regex = re.compile(pattern)\n\n for root, dirs, _\ + \ in os.walk(base_dir):\n for directory in dirs:\n \ + \ if regex.search(directory):\n matching_dirs.append(os.path.join(root,\ + \ directory))\n\n # From 'ilab sdg' the knowledge_*_task.yaml files\ + \ have a line that references where the SDG took place.\n # This\ + \ needs to be updated to run elsewhere.\n # The line is:\n \ + \ # test: /path/to/where/sdg/occured/node_datasets_*\n # TODO:\ + \ update sdg repo: https://github.com/instructlab/sdg/blob/366814b3e89e28c98c0d2a276ad0759c567d2798/src/instructlab/sdg/eval_data.py#L84-%23L114\n\ + \ update_test_lines_in_files(base_dir)\n return matching_dirs\n\ + \n print(\"Starting MMLU_Branch...\")\n\n mmlu_tasks = [\"mmlu_pr\"\ + ]\n\n node_dataset_dirs = find_node_dataset_directories(sdg_path)\n\n\ + \ # This assumes generated filesystem from ilab sdg, which\n # generates\ + \ a node_datasets_ directory for MMLU custom tasks data\n if node_dataset_dirs:\n\ + \ tasks_dir = node_dataset_dirs[0]\n\n mmlu_branch_evaluators\ + \ = [\n MMLUBranchEvaluator(\n model_path=candidate_model,\n\ + \ tasks_dir=tasks_dir,\n tasks=mmlu_tasks,\n\ + \ few_shots=few_shots,\n batch_size=batch_size,\n\ + \ ),\n MMLUBranchEvaluator(\n model_path=base_model_dir,\n\ + \ tasks_dir=tasks_dir,\n tasks=mmlu_tasks,\n\ + \ few_shots=few_shots,\n batch_size=batch_size,\n\ + \ ),\n ]\n m_paths = [candidate_model, base_model_dir]\n\ + \ overall_scores = []\n individual_scores_list = []\n \ + \ for i, evaluator in enumerate(mmlu_branch_evaluators):\n \ + \ m_path = m_paths[i]\n print(\"Launching Vllm...\")\n \ + \ vllm_process, vllm_server = launch_vllm(m_path, gpu_count)\n \ + \ overall_score, individual_scores = evaluator.run(vllm_server)\n\ + \ overall_scores.append(overall_score)\n individual_scores_list.append(individual_scores)\n\ + \ print(\"Stopping Vllm\")\n shutdown_vllm(vllm_process)\n\ + \n # TODO: update instructlab/instructlab model/evaluate.py\n \ + \ # so this logic can be imported outside of the CLI\n overall_score\ + \ = overall_scores[0]\n base_overall_score = overall_scores[1]\n\ + \ individual_scores = individual_scores_list[0]\n base_individual_scores\ + \ = individual_scores_list[1]\n\n improvements, regressions, no_changes\ + \ = [], [], []\n for task, score in individual_scores.items():\n\ + \ base_score = base_individual_scores[task]\n s =\ + \ score[\"score\"]\n b_s = base_score[\"score\"]\n \ + \ d = round(s - b_s, 2)\n if s > b_s:\n improvements.append((task,\ + \ d, b_s, s))\n elif b_s > s:\n regressions.append((task,\ + \ d, b_s, s))\n else:\n no_changes.append((task,\ + \ s))\n\n summary = branch_eval_summary_to_json(\n improvements,\n\ + \ regressions,\n no_changes,\n )\n\n \ + \ mmlu_branch_data = {\n \"report_title\": \"KNOWLEDGE EVALUATION\ + \ REPORT\",\n \"max_score\": \"1.0\",\n \"model\"\ + : candidate_model,\n \"model_score\": round(overall_score, 2),\n\ + \ \"base_model\": base_model_dir,\n \"base_model_score\"\ + : round(base_overall_score, 2),\n \"summary\": summary,\n \ + \ }\n\n with open(mmlu_branch_output.path, \"w\", encoding=\"\ + utf-8\") as f:\n json.dump(mmlu_branch_data, f, indent=4)\n \ + \ else:\n print(\"No MMLU tasks directories found, skipping MMLU_branch\ + \ evaluation.\")\n\n # MT_BENCH_BRANCH\n\n print(\"Starting MT_BENCH_BRANCH\ + \ ...\")\n\n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n \ + \ judge_model_name = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"\ + JUDGE_ENDPOINT\")\n\n output_dir = \"/tmp/eval_output\"\n\n # TODO:\ + \ candidate_branch must be in same repo, not a fork, or, can compare main\ + \ branch against candidate, base models\n base_branch = base_branch or\ + \ \"main\"\n candidate_branch = candidate_branch or \"main\"\n\n ######################################################################\n\ + \ # TODO: Update ilab/model/evaluate evaluate def logic to allow for\ + \ external judge model\n # and when that happens, much of this logic\ + \ can be imported from the 'evaluate' definition:\n # https://github.com/instructlab/instructlab/blob/83ca501ecdd858677380046e2a56da5b2f3f14e7/src/instructlab/model/evaluate.py#L504\n\ + \ #\n # With instructlab, model_name is synonomous with model_path\n\ + \ mt_bench_evaluators = [\n MTBenchBranchEvaluator(\n \ + \ model_name=candidate_model,\n judge_model_name=judge_model_name,\n\ + \ taxonomy_git_repo_path=taxonomy_path,\n branch=candidate_branch,\n\ + \ output_dir=output_dir,\n merge_system_user_message=merge_system_user_message,\n\ + \ ),\n MTBenchBranchEvaluator(\n model_name=base_model_dir,\n\ + \ judge_model_name=judge_model_name,\n taxonomy_git_repo_path=taxonomy_path,\n\ + \ branch=base_branch,\n output_dir=output_dir,\n \ + \ merge_system_user_message=merge_system_user_message,\n \ + \ ),\n ]\n\n # ilab/evaluate uses a magic word for its mt_bench\ + \ evaluator - 'auto'\n # with 'auto', number of gpus allocated for serving\ + \ is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ + \ if max_workers == \"auto\":\n try:\n usable_cpu_count\ + \ = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n\ + \ import multiprocessing\n\n usable_cpu_count = multiprocessing.cpu_count()\ + \ // 2\n max_workers = usable_cpu_count\n\n branches = [candidate_branch,\ + \ base_branch]\n m_paths = [candidate_model, base_model_dir]\n qa_pairs_and_errors\ + \ = []\n for i, evaluator in enumerate(mt_bench_evaluators):\n \ + \ branch = branches[i]\n m_path = m_paths[i]\n\n print(\n\ + \ f\"Generating questions and reference answers from qna files\ + \ for branch {branch}...\"\n )\n vllm_process, vllm_server\ + \ = launch_vllm(m_path, gpu_count)\n\n evaluator.gen_answers(\n \ + \ server_url=vllm_server,\n serving_gpus=gpu_count,\n\ + \ max_workers=max_workers,\n )\n\n shutdown_vllm(vllm_process)\n\ + \n print(f\"Evaluating answers for branch {branch}...\")\n \ + \ overall_score, qa_pairs, error_rate = evaluator.judge_answers(\n \ + \ server_url=judge_endpoint,\n api_key=judge_api_key,\n\ + \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ + \ )\n\n qa_pairs_and_errors.append((overall_score, qa_pairs,\ + \ error_rate))\n\n overall_score, qa_pairs, error_rate = qa_pairs_and_errors[0]\n\ + \ base_overall_score, base_qa_pairs, base_error_rate = qa_pairs_and_errors[1]\n\ + \n qna_to_avg_scores = qa_pairs_to_qna_to_avg_scores(qa_pairs)\n base_qna_to_avg_scores\ + \ = qa_pairs_to_qna_to_avg_scores(base_qa_pairs)\n\n improvements, regressions,\ + \ no_changes, new_qnas = [], [], [], []\n\n for qna, avg_score in qna_to_avg_scores.items():\n\ + \ base_avg_score = base_qna_to_avg_scores.get(qna)\n if base_avg_score\ + \ is not None:\n if avg_score > base_avg_score:\n \ + \ improvements.append(\n (\n \ + \ qna,\n round(avg_score - base_avg_score, 2),\n\ + \ base_avg_score,\n avg_score,\n\ + \ )\n )\n elif avg_score ==\ + \ base_avg_score:\n no_changes.append((qna, avg_score))\n\ + \ else:\n regressions.append(\n \ + \ (\n qna,\n round(avg_score\ + \ - base_avg_score, 2),\n base_avg_score,\n \ + \ avg_score,\n )\n )\n\ + \ else:\n new_qnas.append((qna, avg_score))\n\n error_rate\ + \ = (error_rate + base_error_rate) / 2\n if error_rate > 0:\n \ + \ error_rate = round(error_rate, 2)\n\n summary = branch_eval_summary_to_json(\n\ + \ improvements,\n regressions,\n no_changes,\n \ + \ new_qnas,\n )\n\n mt_bench_branch_data = {\n \"report_title\"\ + : \"SKILLS EVALUATION REPORT\",\n \"model\": candidate_model,\n \ + \ \"judge_model\": judge_model_name,\n \"max_score\": \"10.0\"\ + ,\n \"overall_score\": overall_score,\n \"base_overall_score\"\ + : base_overall_score,\n \"error_rate\": error_rate,\n \"summary\"\ + : summary,\n }\n\n with open(mt_bench_branch_output.path, \"w\", encoding=\"\ + utf-8\") as f:\n json.dump(mt_bench_branch_data, f, indent=4)\n\n" + env: + - name: HOME + value: /tmp + - name: HF_HOME + value: /tmp + image: quay.io/redhat-et/ilab:1.2 + resources: + accelerator: + count: '1' + type: nvidia.com/gpu + exec-run-mt-bench-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - run_mt_bench_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef run_mt_bench_op(\n merge_system_user_message: bool,\n #\ + \ generate_answers,judgment uses a magic word for its mt_bench evaluator\ + \ - 'auto'\n # with 'auto', number of gpus allocated for serving is\ + \ calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ + \ max_workers: str,\n models_folder: str,\n output_path: str =\ + \ \"/output/mt_bench_data.json\",\n best_score_file: Optional[str] =\ + \ None,\n) -> NamedTuple(\"outputs\", best_model=str, best_score=float):\n\ + \ import json\n import os\n import subprocess\n\n import torch\n\ + \ from instructlab.eval.mt_bench import MTBenchEvaluator\n\n if judge_ca_cert\ + \ := os.getenv(\"JUDGE_CA_CERT_PATH\"):\n import httpx\n import\ + \ openai\n\n # Create a custom HTTP client\n class CustomHttpClient(httpx.Client):\n\ + \ def __init__(self, *args, **kwargs):\n # Use\ + \ the custom CA certificate\n kwargs.setdefault(\"verify\"\ + , judge_ca_cert)\n super().__init__(*args, **kwargs)\n\n\ + \ # Create a new OpenAI class that uses the custom HTTP client\n\ + \ class CustomOpenAI(openai.OpenAI):\n def __init__(self,\ + \ *args, **kwargs):\n custom_client = CustomHttpClient()\n\ + \ super().__init__(http_client=custom_client, *args, **kwargs)\n\ + \n # Monkey patch the OpenAI class in the openai module, so that\ + \ the eval lib can use it\n openai.OpenAI = CustomOpenAI\n\n def\ + \ launch_vllm(\n model_path: str, gpu_count: int, retries: int =\ + \ 120, delay: int = 10\n ) -> tuple:\n import subprocess\n \ + \ import sys\n import time\n\n import requests\n \ + \ from instructlab.model.backends.common import free_tcp_ipv4_port\n\n\ + \ free_port = free_tcp_ipv4_port(\"127.0.0.1\")\n port = str(free_port)\n\ + \ vllm_server = f\"http://127.0.0.1:{port}/v1\"\n\n command\ + \ = [\n sys.executable,\n \"-m\",\n \"\ + vllm.entrypoints.openai.api_server\",\n \"--port\",\n \ + \ port,\n \"--model\",\n model_path,\n \ + \ ]\n if gpu_count > 0:\n command += [\n \ + \ \"--tensor-parallel-size\",\n str(gpu_count),\n \ + \ ]\n\n process = subprocess.Popen(args=command)\n\n \ + \ print(f\"Waiting for vLLM server to start at {vllm_server}...\")\n\n\ + \ for attempt in range(retries):\n try:\n \ + \ response = requests.get(f\"{vllm_server}/models\")\n \ + \ if response.status_code == 200:\n print(f\"vLLM server\ + \ is up and running at {vllm_server}.\")\n return process,\ + \ vllm_server\n except requests.ConnectionError:\n \ + \ pass\n\n print(\n f\"Server not available\ + \ yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...\"\ + \n )\n time.sleep(delay)\n\n raise RuntimeError(\n\ + \ f\"Failed to start vLLM server at {vllm_server} after {retries}\ + \ retries.\"\n )\n\n def shutdown_vllm(process: subprocess.Popen,\ + \ timeout: int = 20):\n import subprocess\n\n from instructlab.model.backends.vllm\ + \ import wait_for_stable_vram\n\n try:\n process.terminate()\n\ + \ process.wait(timeout=timeout)\n\n if process.poll()\ + \ is None:\n print(f\"Forcefully killing vLLM server process\ + \ with PID: {process.pid}\")\n process.kill()\n\n \ + \ print(f\"Successfully stopped vLLM server with PID: {process.pid}\"\ + )\n\n except subprocess.TimeoutExpired:\n print(\n \ + \ f\"Timeout expired. Forcefully killing vLLM server with PID:\ + \ {process.pid}\"\n )\n process.kill() # Force kill\ + \ the process if over timeout\n except Exception as e:\n \ + \ print(f\"Failed to stop process with PID {process.pid}. Error: {e}\"\ + )\n # Note from instructlab/model/backends/vllm.py\n # vLLM\ + \ relies on stable VRAM, residual reclamation activity\n # can lead\ + \ to crashes on restart. To prevent this add a\n # short delay (typically\ + \ ~ 10 seconds, max 30) to verify stability.\n wait_for_stable_vram(30)\n\ + \n gpu_available = torch.cuda.is_available()\n gpu_name = (\n \ + \ torch.cuda.get_device_name(torch.cuda.current_device())\n if\ + \ gpu_available\n else \"No GPU available\"\n )\n gpu_count\ + \ = torch.cuda.device_count() if gpu_available else 0\n\n print(f\"GPU\ + \ Available: {gpu_available}, {gpu_name}\")\n\n models_list = os.listdir(models_folder)\n\ + \n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ + \ = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\"\ + )\n\n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment\ + \ uses a magic word for its mt_bench evaluator - 'auto'\n # with 'auto',\ + \ number of gpus allocated for serving is calculated based on environment\n\ + \ # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ + \ if max_workers == \"auto\":\n try:\n usable_cpu_count\ + \ = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n\ + \ import multiprocessing\n\n usable_cpu_count = multiprocessing.cpu_count()\ + \ // 2\n max_workers = usable_cpu_count\n\n # modify model_list\ + \ to ignore any jsonl files present in the directory\n models_list =\ + \ [model for model in models_list if not model.endswith(\".jsonl\")]\n \ + \ for model_name in models_list:\n print(f\"Serving candidate model:\ + \ {model_name}\")\n model_path = f\"{models_folder}/{model_name}\"\ + \n\n vllm_process, vllm_server = launch_vllm(model_path, gpu_count)\n\ + \n # model ID is the model_path value in vLLM\n evaluator\ + \ = MTBenchEvaluator(\n model_name=model_path,\n judge_model_name=judge_model_name,\n\ + \ output_dir=\"/tmp/eval_output\",\n merge_system_user_message=merge_system_user_message,\n\ + \ )\n\n evaluator.gen_answers(\n server_url=vllm_server,\n\ + \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ + \ )\n\n shutdown_vllm(vllm_process)\n\n overall_score,\ + \ qa_pairs, turn_scores, error_rate = evaluator.judge_answers(\n \ + \ server_url=judge_endpoint,\n api_key=judge_api_key,\n \ + \ serving_gpus=gpu_count,\n max_workers=max_workers,\n\ + \ )\n\n mt_bench_data = {\n \"report_title\": \"\ + SKILLS EVALUATION REPORT\",\n \"model\": model_path,\n \ + \ \"judge_model\": judge_model_name,\n \"overall_score\"\ + : overall_score,\n \"turn_scores\": turn_scores,\n \ + \ \"qa_scores\": qa_pairs,\n \"error_rate\": error_rate,\n \ + \ }\n\n all_mt_bench_data.append(mt_bench_data)\n scores[model_path]\ + \ = overall_score\n\n with open(output_path, \"w\", encoding=\"utf-8\"\ + ) as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n outputs\ + \ = NamedTuple(\"outputs\", best_model=str, best_score=float)\n best_model\ + \ = max(scores, key=scores.get)\n best_score = scores[best_model]\n \ + \ if best_score_file:\n with open(best_score_file, \"w\", encoding=\"\ + utf-8\") as f:\n json.dump({\"best_model\": best_model, \"best_score\"\ + : best_score}, f, indent=4)\n\n # Rename the best model directory to\ + \ \"candidate_model\" for the next step\n # So we know which model to\ + \ use for the final evaluation\n if os.path.exists(os.path.join(models_folder,\ + \ \"candidate_model\")):\n print(\"candidate_model already exists.\ + \ Skipping renaming\")\n else:\n os.rename(\n os.path.join(models_folder,\ + \ best_model),\n os.path.join(models_folder, \"candidate_model\"\ + ),\n )\n\n return outputs(best_model=best_model, best_score=best_score)\n\ + \n" + env: + - name: HOME + value: /tmp + - name: HF_HOME + value: /tmp + image: quay.io/redhat-et/ilab:1.2 + resources: + accelerator: + count: '1' + type: nvidia.com/gpu + exec-sdg-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - sdg_op + command: + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef sdg_op(\n num_instructions_to_generate: int,\n pipeline:\ + \ str,\n repo_branch: Optional[str],\n repo_pr: Optional[int],\n \ + \ taxonomy_path: str = \"/data/taxonomy\",\n sdg_path: str = \"/data/sdg\"\ + ,\n sdg_sampling_size: float = 1.0,\n):\n from os import getenv, path\n\ + \n import openai\n import yaml\n from instructlab.sdg import generate_data\n\ + \ from instructlab.sdg.utils.taxonomy import read_taxonomy\n\n def\ + \ set_precomputed_skills_data_ratio(sampling_size: float):\n skills_recipe\ + \ = \"/usr/share/instructlab/sdg/default_data_recipes/skills.yaml\"\n \ + \ if path.exists(skills_recipe):\n with open(skills_recipe,\ + \ \"r\") as file:\n skills_yaml = yaml.load(file, Loader=yaml.Loader)\n\ + \n skills_yaml[\"datasets\"][0][\"sampling_size\"] = sampling_size\n\ + \n with open(skills_recipe, \"w\", encoding=\"utf-8\") as file:\n\ + \ yaml.dump(skills_yaml, file)\n\n api_key = getenv(\"\ + api_key\")\n model = getenv(\"model\")\n endpoint = getenv(\"endpoint\"\ + )\n\n if sdg_ca_cert := getenv(\"SDG_CA_CERT_PATH\"):\n import\ + \ httpx\n\n custom_http_client = httpx.Client(verify=sdg_ca_cert)\n\ + \ client = openai.OpenAI(\n base_url=endpoint, api_key=api_key,\ + \ http_client=custom_http_client\n )\n else:\n client =\ + \ openai.OpenAI(base_url=endpoint, api_key=api_key)\n\n taxonomy_base\ + \ = \"main\" if repo_branch or (repo_pr and int(repo_pr) > 0) else \"empty\"\ + \n\n print(\"Generating synthetic dataset for:\")\n print()\n print(read_taxonomy(taxonomy_path,\ + \ taxonomy_base))\n\n set_precomputed_skills_data_ratio(sampling_size=sdg_sampling_size)\n\ + \n # generate_data has a magic word for its taxonomy_base argument -\ + \ 'empty'\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n\ + \ generate_data(\n client=client,\n num_instructions_to_generate=num_instructions_to_generate,\n\ + \ output_dir=sdg_path,\n taxonomy=taxonomy_path,\n \ + \ taxonomy_base=taxonomy_base,\n model_name=model,\n pipeline=pipeline,\n\ + \ chunk_word_count=1000,\n server_ctx_size=4096,\n )\n\n" + env: + - name: HOME + value: /tmp + - name: HF_HOME + value: /tmp + image: quay.io/redhat-et/ilab:1.2 + exec-sdg-to-artifact-op: + container: + args: + - cp -r {{$.inputs.parameters['pvc_path']}} {{$.outputs.artifacts['sdg'].path}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-skills-processed-data-to-artifact-op: + container: + args: + - cp -r {{$.inputs.parameters['pvc_path']}} {{$.outputs.artifacts['skills_processed_data'].path}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox + exec-taxonomy-to-artifact-op: + container: + args: + - cp -r {{$.inputs.parameters['pvc_path']}} {{$.outputs.artifacts['taxonomy'].path}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox +pipelineInfo: + description: InstructLab pipeline + displayName: InstructLab + name: instructlab +root: + dag: + tasks: + createpvc: + cachingOptions: + enableCache: true + componentRef: + name: comp-createpvc + inputs: + parameters: + access_modes: + runtimeValue: + constant: + - ReadWriteMany + pvc_name_suffix: + runtimeValue: + constant: -sdg + size: + runtimeValue: + constant: 10Gi + storage_class_name: + componentInputParameter: k8s_storage_class_name + taskInfo: + name: createpvc + createpvc-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-createpvc-2 + inputs: + parameters: + access_modes: + runtimeValue: + constant: + - ReadWriteMany + pvc_name_suffix: + runtimeValue: + constant: -model-cache + size: + runtimeValue: + constant: 100Gi + storage_class_name: + componentInputParameter: k8s_storage_class_name + taskInfo: + name: createpvc-2 + createpvc-3: + cachingOptions: + enableCache: true + componentRef: + name: comp-createpvc-3 + inputs: + parameters: + access_modes: + runtimeValue: + constant: + - ReadWriteMany + pvc_name_suffix: + runtimeValue: + constant: -output + size: + runtimeValue: + constant: 100Gi + storage_class_name: + componentInputParameter: k8s_storage_class_name + taskInfo: + name: createpvc-3 + data-processing-op: + cachingOptions: {} + componentRef: + name: comp-data-processing-op + dependentTasks: + - createpvc + - createpvc-2 + - model-to-pvc-op + - sdg-op + inputs: + parameters: + max_batch_len: + componentInputParameter: sdg_max_batch_len + taskInfo: + name: data-processing-op + deletepvc: + cachingOptions: + enableCache: true + componentRef: + name: comp-deletepvc + dependentTasks: + - createpvc-3 + - pvc-to-model-op + - pvc-to-mt-bench-op + - run-final-eval-op + inputs: + parameters: + pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + taskInfo: + name: deletepvc + deletepvc-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-deletepvc-2 + dependentTasks: + - createpvc + - run-final-eval-op + inputs: + parameters: + pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + taskInfo: + name: deletepvc-2 + deletepvc-3: + cachingOptions: + enableCache: true + componentRef: + name: comp-deletepvc-3 + dependentTasks: + - createpvc-2 + - run-final-eval-op + inputs: + parameters: + pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 + taskInfo: + name: deletepvc-3 + git-clone-op: + cachingOptions: {} + componentRef: + name: comp-git-clone-op + dependentTasks: + - createpvc + inputs: + parameters: + repo_branch: + componentInputParameter: sdg_repo_branch + repo_pr: + componentInputParameter: sdg_repo_pr + repo_url: + componentInputParameter: sdg_repo_url + taskInfo: + name: git-clone-op + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + uri: + componentInputParameter: sdg_base_model + taskInfo: + name: importer + knowledge-processed-data-to-artifact-op: + cachingOptions: {} + componentRef: + name: comp-knowledge-processed-data-to-artifact-op + dependentTasks: + - createpvc + - data-processing-op + taskInfo: + name: knowledge-processed-data-to-artifact-op + model-to-pvc-op: + cachingOptions: {} + componentRef: + name: comp-model-to-pvc-op + dependentTasks: + - createpvc-2 + - importer + inputs: + artifacts: + model: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer + taskInfo: + name: model-to-pvc-op + pvc-to-model-op: + cachingOptions: {} + componentRef: + name: comp-pvc-to-model-op + dependentTasks: + - createpvc-3 + - run-mt-bench-op + inputs: + parameters: + pvc_path: + runtimeValue: + constant: /output/phase_2/model/hf_format/candidate_model + taskInfo: + name: pvc-to-model-op + pvc-to-mt-bench-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-pvc-to-mt-bench-op + dependentTasks: + - createpvc-3 + - run-mt-bench-op + inputs: + parameters: + pvc_path: + runtimeValue: + constant: /output/mt_bench_data.json + taskInfo: + name: pvc-to-mt-bench-op + pytorchjob-manifest-op: + cachingOptions: {} + componentRef: + name: comp-pytorchjob-manifest-op + dependentTasks: + - createpvc + - createpvc-2 + - createpvc-3 + - data-processing-op + - model-to-pvc-op + inputs: + parameters: + effective_batch_size: + componentInputParameter: train_effective_batch_size_phase_1 + input_pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + learning_rate: + componentInputParameter: train_learning_rate_phase_1 + max_batch_len: + componentInputParameter: train_max_batch_len + model_pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 + name_suffix: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + nnodes: + componentInputParameter: train_nnodes + nproc_per_node: + componentInputParameter: train_nproc_per_node + num_epochs: + componentInputParameter: train_num_epochs_phase_1 + num_warmup_steps: + componentInputParameter: train_num_warmup_steps_phase_1 + output_pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + phase_num: + runtimeValue: + constant: 1.0 + save_samples: + componentInputParameter: train_save_samples + seed: + componentInputParameter: train_seed + taskInfo: + name: pytorchjob-manifest-op + pytorchjob-manifest-op-2: + cachingOptions: {} + componentRef: + name: comp-pytorchjob-manifest-op-2 + dependentTasks: + - createpvc + - createpvc-2 + - createpvc-3 + - pytorchjob-manifest-op + inputs: + parameters: + effective_batch_size: + componentInputParameter: train_effective_batch_size_phase_2 + input_pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + learning_rate: + componentInputParameter: train_learning_rate_phase_2 + max_batch_len: + componentInputParameter: train_max_batch_len + model_pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 + name_suffix: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + nnodes: + componentInputParameter: train_nnodes + nproc_per_node: + componentInputParameter: train_nproc_per_node + num_epochs: + componentInputParameter: train_num_epochs_phase_2 + num_warmup_steps: + componentInputParameter: train_num_warmup_steps_phase_2 + output_pvc_name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + phase_num: + runtimeValue: + constant: 2.0 + save_samples: + componentInputParameter: train_save_samples + seed: + componentInputParameter: train_seed + taskInfo: + name: pytorchjob-manifest-op-2 + run-final-eval-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-run-final-eval-op + dependentTasks: + - createpvc + - createpvc-2 + - createpvc-3 + - run-mt-bench-op + inputs: + parameters: + base_branch: + componentInputParameter: sdg_repo_branch + base_model_dir: + runtimeValue: + constant: /model/ + batch_size: + componentInputParameter: final_eval_batch_size + candidate_branch: + componentInputParameter: sdg_repo_branch + candidate_model: + runtimeValue: + constant: /output/phase_2/model/hf_format/candidate_model + few_shots: + componentInputParameter: final_eval_few_shots + max_workers: + componentInputParameter: final_eval_max_workers + merge_system_user_message: + componentInputParameter: final_eval_merge_system_user_message + taskInfo: + name: run-final-eval-op + run-mt-bench-op: + cachingOptions: {} + componentRef: + name: comp-run-mt-bench-op + dependentTasks: + - createpvc-3 + - pytorchjob-manifest-op-2 + inputs: + parameters: + max_workers: + componentInputParameter: mt_bench_max_workers + merge_system_user_message: + componentInputParameter: mt_bench_merge_system_user_message + models_folder: + runtimeValue: + constant: /output/phase_2/model/hf_format + taskInfo: + name: run-mt-bench-op + sdg-op: + cachingOptions: {} + componentRef: + name: comp-sdg-op + dependentTasks: + - createpvc + - git-clone-op + inputs: + parameters: + num_instructions_to_generate: + componentInputParameter: sdg_scale_factor + pipeline: + componentInputParameter: sdg_pipeline + repo_branch: + componentInputParameter: sdg_repo_branch + repo_pr: + componentInputParameter: sdg_repo_pr + sdg_sampling_size: + componentInputParameter: sdg_sample_size + taskInfo: + name: sdg-op + sdg-to-artifact-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-sdg-to-artifact-op + dependentTasks: + - createpvc + - git-clone-op + - sdg-op + taskInfo: + name: sdg-to-artifact-op + skills-processed-data-to-artifact-op: + cachingOptions: {} + componentRef: + name: comp-skills-processed-data-to-artifact-op + dependentTasks: + - createpvc + - data-processing-op + taskInfo: + name: skills-processed-data-to-artifact-op + taxonomy-to-artifact-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-taxonomy-to-artifact-op + dependentTasks: + - createpvc + - git-clone-op + - sdg-op + taskInfo: + name: taxonomy-to-artifact-op + inputDefinitions: + parameters: + final_eval_batch_size: + defaultValue: auto + description: Final model evaluation parameter for MMLU. Batch size for evaluation. + Valid values are a positive integer or 'auto' to select the largest batch + size that will fit in memory. + isOptional: true + parameterType: STRING + final_eval_few_shots: + defaultValue: 5.0 + description: Final model evaluation parameter for MMLU. Number of question-answer + pairs provided in the context preceding the question used for evaluation. + isOptional: true + parameterType: NUMBER_INTEGER + final_eval_max_workers: + defaultValue: auto + description: Final model evaluation parameter for MT Bench Branch. Number + of workers to use for evaluation with mt_bench or mt_bench_branch. Must + be a positive integer or 'auto'. + isOptional: true + parameterType: STRING + final_eval_merge_system_user_message: + defaultValue: false + description: Final model evaluation parameter for MT Bench Branch. Boolean + indicating whether to merge system and user messages (required for Mistral + based judges) + isOptional: true + parameterType: BOOLEAN + k8s_storage_class_name: + defaultValue: standard + description: A Kubernetes StorageClass name for persistent volumes. Selected + StorageClass must support RWX PersistentVolumes. + isOptional: true + parameterType: STRING + mt_bench_max_workers: + defaultValue: auto + description: MT Bench parameter. Number of workers to use for evaluation with + mt_bench or mt_bench_branch. Must be a positive integer or 'auto'. + isOptional: true + parameterType: STRING + mt_bench_merge_system_user_message: + defaultValue: false + description: MT Bench parameter. Boolean indicating whether to merge system + and user messages (required for Mistral based judges) + isOptional: true + parameterType: BOOLEAN + sdg_base_model: + defaultValue: s3:/// + description: SDG parameter. LLM model used to generate the synthetic dataset + isOptional: true + parameterType: STRING + sdg_max_batch_len: + defaultValue: 20000.0 + description: SDG parameter. Maximum tokens per gpu for each batch that will + be handled in a single step. + isOptional: true + parameterType: NUMBER_INTEGER + sdg_pipeline: + defaultValue: simple + description: 'SDG parameter. Data generation pipeline to use. Available: ''simple'', + ''full'', or a valid path to a directory of pipeline workflow YAML files. + Note that ''full'' requires a larger teacher model, Mixtral-8x7b.' + isOptional: true + parameterType: STRING + sdg_repo_branch: + description: SDG parameter. Points to a branch within the taxonomy git repository. + If set, has priority over sdg_repo_pr + isOptional: true + parameterType: STRING + sdg_repo_pr: + description: SDG parameter. Points to a pull request against the taxonomy + git repository + isOptional: true + parameterType: NUMBER_INTEGER + sdg_repo_url: + defaultValue: https://github.com/instructlab/taxonomy.git + description: SDG parameter. Points to a taxonomy git repository + isOptional: true + parameterType: STRING + sdg_sample_size: + defaultValue: 1.0 + description: SDG parameter. Represents the sdg skills recipe sampling size + as percentage in decimal form. + isOptional: true + parameterType: NUMBER_DOUBLE + sdg_scale_factor: + defaultValue: 2.0 + description: SDG parameter. The total number of instructions to be generated. + isOptional: true + parameterType: NUMBER_INTEGER + train_effective_batch_size_phase_1: + defaultValue: 3840.0 + description: Training parameter for in Phase 1. The number of samples in a + batch that the model should see before its parameters are updated. + isOptional: true + parameterType: NUMBER_INTEGER + train_effective_batch_size_phase_2: + defaultValue: 3840.0 + description: Training parameter for in Phase 2. The number of samples in a + batch that the model should see before its parameters are updated. + isOptional: true + parameterType: NUMBER_INTEGER + train_learning_rate_phase_1: + defaultValue: 0.0001 + description: Training parameter for in Phase 1. How fast we optimize the weights + during gradient descent. Higher values may lead to unstable learning performance. + It's generally recommended to have a low learning rate with a high effective + batch size. + isOptional: true + parameterType: NUMBER_DOUBLE + train_learning_rate_phase_2: + defaultValue: 0.0001 + description: Training parameter for in Phase 2. How fast we optimize the weights + during gradient descent. Higher values may lead to unstable learning performance. + It's generally recommended to have a low learning rate with a high effective + batch size. + isOptional: true + parameterType: NUMBER_DOUBLE + train_max_batch_len: + defaultValue: 20000.0 + description: Training parameter. Maximum tokens per gpu for each batch that + will be handled in a single step. + isOptional: true + parameterType: NUMBER_INTEGER + train_nnodes: + defaultValue: 2.0 + description: Training parameter. Number of nodes/workers to train on. + isOptional: true + parameterType: NUMBER_INTEGER + train_nproc_per_node: + defaultValue: 3.0 + description: Training parameter. Number of GPUs per each node/worker to use + for training. + isOptional: true + parameterType: NUMBER_INTEGER + train_num_epochs_phase_1: + defaultValue: 2.0 + description: Training parameter for in Phase 1. Number of epochs to run training. + isOptional: true + parameterType: NUMBER_INTEGER + train_num_epochs_phase_2: + defaultValue: 2.0 + description: Training parameter for in Phase 2. Number of epochs to run training. + isOptional: true + parameterType: NUMBER_INTEGER + train_num_warmup_steps_phase_1: + defaultValue: 100.0 + description: Training parameter for in Phase 1. The number of steps a model + should go through before reaching the full learning rate. We start at 0 + and linearly climb up to train_learning_rate. + isOptional: true + parameterType: NUMBER_INTEGER + train_num_warmup_steps_phase_2: + defaultValue: 100.0 + description: Training parameter for in Phase 2. The number of steps a model + should go through before reaching the full learning rate. We start at 0 + and linearly climb up to train_learning_rate. + isOptional: true + parameterType: NUMBER_INTEGER + train_save_samples: + defaultValue: 0.0 + description: Training parameter. Number of samples the model should see before + saving a checkpoint. + isOptional: true + parameterType: NUMBER_INTEGER + train_seed: + defaultValue: 42.0 + description: Training parameter. Random seed for initializing training. + isOptional: true + parameterType: NUMBER_INTEGER +schemaVersion: 2.1.0 +sdkVersion: kfp-2.9.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-data-processing-op: + imagePullSecret: + - secretName: redhat-et-ilab-botty-pull-secret + pvcMount: + - mountPath: /model + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + exec-git-clone-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + exec-knowledge-processed-data-to-artifact-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + exec-model-to-pvc-op: + pvcMount: + - mountPath: /model + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 + exec-pvc-to-model-op: + pvcMount: + - mountPath: /output + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + exec-pvc-to-mt-bench-op: + pvcMount: + - mountPath: /output + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + exec-pytorchjob-manifest-op-2: + pvcMount: + - mountPath: /output + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + exec-run-final-eval-op: + configMapAsEnv: + - configMapName: judge-server + keyToEnv: + - configMapKey: endpoint + envVar: JUDGE_ENDPOINT + - configMapKey: model + envVar: JUDGE_NAME + imagePullSecret: + - secretName: redhat-et-ilab-botty-pull-secret + pvcMount: + - mountPath: /output + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + - mountPath: /input + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + - mountPath: /model + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 + secretAsEnv: + - keyToEnv: + - envVar: JUDGE_API_KEY + secretKey: api_key + secretName: judge-server + exec-run-mt-bench-op: + configMapAsEnv: + - configMapName: judge-server + keyToEnv: + - configMapKey: endpoint + envVar: JUDGE_ENDPOINT + - configMapKey: model + envVar: JUDGE_NAME + imagePullSecret: + - secretName: redhat-et-ilab-botty-pull-secret + pvcMount: + - mountPath: /output + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-3 + secretAsEnv: + - keyToEnv: + - envVar: JUDGE_API_KEY + secretKey: api_key + secretName: judge-server + exec-sdg-op: + configMapAsEnv: + - configMapName: teacher-server + keyToEnv: + - configMapKey: endpoint + envVar: endpoint + - configMapKey: model + envVar: model + imagePullSecret: + - secretName: redhat-et-ilab-botty-pull-secret + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + secretAsEnv: + - keyToEnv: + - envVar: api_key + secretKey: api_key + secretName: teacher-server + exec-sdg-to-artifact-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + exec-skills-processed-data-to-artifact-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + exec-taxonomy-to-artifact-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc