diff --git a/apis/inference/v1alpha1/elasticbatchjob_types.go b/apis/inference/v1alpha1/elasticbatchjob_types.go index c62c320d..b47ca209 100644 --- a/apis/inference/v1alpha1/elasticbatchjob_types.go +++ b/apis/inference/v1alpha1/elasticbatchjob_types.go @@ -41,6 +41,9 @@ type ElasticBatchJobSpec struct { // "Worker": ReplicaSpec, // } ElasticBatchReplicaSpecs map[common.ReplicaType]*common.ReplicaSpec `json:"elasticBatchReplicaSpecs"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *common.NetworkMode `json:"networkmode,omitempty"` } // +genclient diff --git a/apis/inference/v1alpha1/zz_generated.deepcopy.go b/apis/inference/v1alpha1/zz_generated.deepcopy.go index 410e5352..7c645dd1 100644 --- a/apis/inference/v1alpha1/zz_generated.deepcopy.go +++ b/apis/inference/v1alpha1/zz_generated.deepcopy.go @@ -109,6 +109,11 @@ func (in *ElasticBatchJobSpec) DeepCopyInto(out *ElasticBatchJobSpec) { (*out)[key] = outVal } } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ElasticBatchJobSpec. diff --git a/apis/training/v1alpha1/elasticdljob_types.go b/apis/training/v1alpha1/elasticdljob_types.go index 38170ebe..97a723dc 100644 --- a/apis/training/v1alpha1/elasticdljob_types.go +++ b/apis/training/v1alpha1/elasticdljob_types.go @@ -35,6 +35,9 @@ type ElasticDLJobSpec struct { // "Master": ElasticDLReplicaSpec, // } ElasticDLReplicaSpecs map[common.ReplicaType]*common.ReplicaSpec `json:"elasticdlReplicaSpecs"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *common.NetworkMode `json:"networkmode,omitempty"` } // +genclient diff --git a/apis/training/v1alpha1/marsjob_types.go b/apis/training/v1alpha1/marsjob_types.go index ebdda44d..61f75bac 100644 --- a/apis/training/v1alpha1/marsjob_types.go +++ b/apis/training/v1alpha1/marsjob_types.go @@ -40,6 +40,9 @@ type MarsJobSpec struct { // MarsReplicaSpecs is a map of MarsReplicaType(key) to ReplicaSpec(value), // specifying replicas and template of each type. MarsReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec `json:"marsReplicaSpecs"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *commonv1.NetworkMode `json:"networkmode,omitempty"` } // MarsJobStatus defines the observed state of MarsJob diff --git a/apis/training/v1alpha1/mpijob_types.go b/apis/training/v1alpha1/mpijob_types.go index 79ef7bb9..70a319de 100644 --- a/apis/training/v1alpha1/mpijob_types.go +++ b/apis/training/v1alpha1/mpijob_types.go @@ -46,6 +46,9 @@ type MPIJobSpec struct { // LegacySpec reserves the deprecated fields for backward compatibility. *MPIJobLegacySpec `json:",inline"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *apiv1.NetworkMode `json:"networkmode,omitempty"` } // MPIJobLegacySpec is a collection of legacy fields that were used in v1alpha1/v1alpha2 but diff --git a/apis/training/v1alpha1/pytorchjob_types.go b/apis/training/v1alpha1/pytorchjob_types.go index 4b3f2787..562aae25 100644 --- a/apis/training/v1alpha1/pytorchjob_types.go +++ b/apis/training/v1alpha1/pytorchjob_types.go @@ -46,6 +46,9 @@ type PyTorchJobSpec struct { // CacheBackend is used to configure the cache engine for job // +optional CacheBackend *cachev1alpha1.CacheBackendSpec `json:"cacheBackend"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *common.NetworkMode `json:"networkmode,omitempty"` } // PyTorchJobStatus defines the observed state of PyTorchJob diff --git a/apis/training/v1alpha1/tfjob_types.go b/apis/training/v1alpha1/tfjob_types.go index 2fc1abbe..91417540 100644 --- a/apis/training/v1alpha1/tfjob_types.go +++ b/apis/training/v1alpha1/tfjob_types.go @@ -54,6 +54,9 @@ type TFJobSpec struct { // CacheBackend is used to configure the cache engine for job // +optional CacheBackend *cachev1alpha1.CacheBackendSpec `json:"cacheBackend"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *commonv1.NetworkMode `json:"networkmode,omitempty"` } // +genclient diff --git a/apis/training/v1alpha1/xdljob_types.go b/apis/training/v1alpha1/xdljob_types.go index fe5de381..2c83a793 100644 --- a/apis/training/v1alpha1/xdljob_types.go +++ b/apis/training/v1alpha1/xdljob_types.go @@ -48,6 +48,9 @@ type XDLJobSpec struct { // MinFinishWorkPercentage takes precedence over MinFinishWorkerNum if both are // specified. MinFinishWorkerPercentage *int32 `json:"minFinishWorkRate,omitempty"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *v1.NetworkMode `json:"networkmode,omitempty"` } // XDLJobStatus defines the observed state of XDLJob diff --git a/apis/training/v1alpha1/xgboostjob_types.go b/apis/training/v1alpha1/xgboostjob_types.go index 08c9fe3e..5f7a23e0 100644 --- a/apis/training/v1alpha1/xgboostjob_types.go +++ b/apis/training/v1alpha1/xgboostjob_types.go @@ -37,6 +37,9 @@ type XGBoostJobSpec struct { // "Worker": ReplicaSpec, // } XGBReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec `json:"xgbReplicaSpecs"` + + // NetworkMode defines network mode for intra job communicating. + NetworkMode *commonv1.NetworkMode `json:"networkmode,omitempty"` } // XGBoostJobStatus defines the observed state of XGBoostJob diff --git a/apis/training/v1alpha1/zz_generated.deepcopy.go b/apis/training/v1alpha1/zz_generated.deepcopy.go index 5785441b..c6bc07fb 100644 --- a/apis/training/v1alpha1/zz_generated.deepcopy.go +++ b/apis/training/v1alpha1/zz_generated.deepcopy.go @@ -106,6 +106,11 @@ func (in *ElasticDLJobSpec) DeepCopyInto(out *ElasticDLJobSpec) { (*out)[key] = outVal } } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ElasticDLJobSpec. @@ -297,6 +302,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(MPIJobLegacySpec) (*in).DeepCopyInto(*out) } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MPIJobSpec. @@ -397,6 +407,11 @@ func (in *MarsJobSpec) DeepCopyInto(out *MarsJobSpec) { (*out)[key] = outVal } } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MarsJobSpec. @@ -558,6 +573,11 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { *out = new(cachev1alpha1.CacheBackendSpec) (*in).DeepCopyInto(*out) } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PyTorchJobSpec. @@ -678,6 +698,11 @@ func (in *TFJobSpec) DeepCopyInto(out *TFJobSpec) { *out = new(cachev1alpha1.CacheBackendSpec) (*in).DeepCopyInto(*out) } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TFJobSpec. @@ -778,6 +803,11 @@ func (in *XDLJobSpec) DeepCopyInto(out *XDLJobSpec) { *out = new(int32) **out = **in } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new XDLJobSpec. @@ -883,6 +913,11 @@ func (in *XGBoostJobSpec) DeepCopyInto(out *XGBoostJobSpec) { (*out)[key] = outVal } } + if in.NetworkMode != nil { + in, out := &in.NetworkMode, &out.NetworkMode + *out = new(v1.NetworkMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new XGBoostJobSpec. diff --git a/config/crd/bases/inference.kubedl.io_elasticbatchjobs.yaml b/config/crd/bases/inference.kubedl.io_elasticbatchjobs.yaml index cc3fb179..62023d6f 100644 --- a/config/crd/bases/inference.kubedl.io_elasticbatchjobs.yaml +++ b/config/crd/bases/inference.kubedl.io_elasticbatchjobs.yaml @@ -3069,6 +3069,8 @@ spec: type: object type: object type: object + networkmode: + type: string schedulingPolicy: properties: minAvailable: diff --git a/config/crd/bases/training.kubedl.io_elasticdljobs.yaml b/config/crd/bases/training.kubedl.io_elasticdljobs.yaml index effd16fb..0a6e1c37 100644 --- a/config/crd/bases/training.kubedl.io_elasticdljobs.yaml +++ b/config/crd/bases/training.kubedl.io_elasticdljobs.yaml @@ -3063,6 +3063,8 @@ spec: type: object type: object type: object + networkmode: + type: string schedulingPolicy: properties: minAvailable: diff --git a/config/crd/bases/training.kubedl.io_marsjobs.yaml b/config/crd/bases/training.kubedl.io_marsjobs.yaml index 73fbf03c..64c31ba6 100644 --- a/config/crd/bases/training.kubedl.io_marsjobs.yaml +++ b/config/crd/bases/training.kubedl.io_marsjobs.yaml @@ -3063,6 +3063,8 @@ spec: type: object type: object type: object + networkmode: + type: string schedulingPolicy: properties: minAvailable: diff --git a/config/crd/bases/training.kubedl.io_mpijobs.yaml b/config/crd/bases/training.kubedl.io_mpijobs.yaml index 878a6b6c..d4fec8d7 100644 --- a/config/crd/bases/training.kubedl.io_mpijobs.yaml +++ b/config/crd/bases/training.kubedl.io_mpijobs.yaml @@ -3075,6 +3075,8 @@ spec: type: object type: object type: object + networkmode: + type: string processingResourceType: type: string processingUnits: diff --git a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml index c12b7dda..5ef88faa 100644 --- a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml +++ b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml @@ -161,6 +161,8 @@ spec: type: object type: object type: object + networkmode: + type: string pytorchReplicaSpecs: additionalProperties: properties: diff --git a/config/crd/bases/training.kubedl.io_tfjobs.yaml b/config/crd/bases/training.kubedl.io_tfjobs.yaml index 38b72489..45cc6284 100644 --- a/config/crd/bases/training.kubedl.io_tfjobs.yaml +++ b/config/crd/bases/training.kubedl.io_tfjobs.yaml @@ -161,6 +161,8 @@ spec: type: object type: object type: object + networkmode: + type: string schedulingPolicy: properties: minAvailable: diff --git a/config/crd/bases/training.kubedl.io_xdljobs.yaml b/config/crd/bases/training.kubedl.io_xdljobs.yaml index ff3a405b..2ec93ab3 100644 --- a/config/crd/bases/training.kubedl.io_xdljobs.yaml +++ b/config/crd/bases/training.kubedl.io_xdljobs.yaml @@ -72,6 +72,8 @@ spec: minFinishWorkRate: format: int32 type: integer + networkmode: + type: string schedulingPolicy: properties: minAvailable: diff --git a/config/crd/bases/training.kubedl.io_xgboostjobs.yaml b/config/crd/bases/training.kubedl.io_xgboostjobs.yaml index b248367b..91cbaa0d 100644 --- a/config/crd/bases/training.kubedl.io_xgboostjobs.yaml +++ b/config/crd/bases/training.kubedl.io_xgboostjobs.yaml @@ -66,6 +66,8 @@ spec: required: - schedule type: object + networkmode: + type: string schedulingPolicy: properties: minAvailable: diff --git a/controllers/elasticbatch/elasticbatchjob_controller.go b/controllers/elasticbatch/elasticbatchjob_controller.go index 65186f11..2a32718d 100644 --- a/controllers/elasticbatch/elasticbatchjob_controller.go +++ b/controllers/elasticbatch/elasticbatchjob_controller.go @@ -130,7 +130,7 @@ func (r *ElasticBatchJobReconciler) Reconcile(_ context.Context, req ctrl.Reques // Set default properties for elasticbatch job. r.scheme.Default(elasticbatchJob) - result, err := r.ctrl.ReconcileJobs(elasticbatchJob, elasticbatchJob.Spec.ElasticBatchReplicaSpecs, elasticbatchJob.Status, &elasticbatchJob.Spec.RunPolicy, nil, nil) + result, err := r.ctrl.ReconcileJobs(elasticbatchJob, elasticbatchJob.Spec.ElasticBatchReplicaSpecs, elasticbatchJob.Status, &elasticbatchJob.Spec.RunPolicy, nil, nil, elasticbatchJob.Spec.NetworkMode) if err != nil { log.Error(err, "elasticbatch job reconcile failed") return result, err diff --git a/controllers/elasticdl/elasticdljob_controller.go b/controllers/elasticdl/elasticdljob_controller.go index 9d34fa9a..65a86b02 100644 --- a/controllers/elasticdl/elasticdljob_controller.go +++ b/controllers/elasticdl/elasticdljob_controller.go @@ -112,7 +112,7 @@ func (r *ElasticDLJobReconciler) Reconcile(_ context.Context, req ctrl.Request) // Set default properties for elasicdl job. r.scheme.Default(elasticdlJob) - result, err := r.ctrl.ReconcileJobs(elasticdlJob, elasticdlJob.Spec.ElasticDLReplicaSpecs, elasticdlJob.Status, &elasticdlJob.Spec.RunPolicy, nil, nil) + result, err := r.ctrl.ReconcileJobs(elasticdlJob, elasticdlJob.Spec.ElasticDLReplicaSpecs, elasticdlJob.Status, &elasticdlJob.Spec.RunPolicy, nil, nil, elasticdlJob.Spec.NetworkMode) if err != nil { log.Error(err, "elasticdl job reconcile failed") return result, err diff --git a/controllers/mars/marsjob_controller.go b/controllers/mars/marsjob_controller.go index ff9993fa..d0a7a98a 100644 --- a/controllers/mars/marsjob_controller.go +++ b/controllers/mars/marsjob_controller.go @@ -118,7 +118,7 @@ func (r *MarsJobReconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl r.scheme.Default(marsJob) - result, err := r.ctrl.ReconcileJobs(marsJob, marsJob.Spec.MarsReplicaSpecs, marsJob.Status.JobStatus, &marsJob.Spec.RunPolicy, nil, nil) + result, err := r.ctrl.ReconcileJobs(marsJob, marsJob.Spec.MarsReplicaSpecs, marsJob.Status.JobStatus, &marsJob.Spec.RunPolicy, nil, nil, marsJob.Spec.NetworkMode) if err != nil { log.Error(err, "mars job reconcile failed") return result, err diff --git a/controllers/mpi/mpijob_controller.go b/controllers/mpi/mpijob_controller.go index 2756e316..f6eba5e4 100644 --- a/controllers/mpi/mpijob_controller.go +++ b/controllers/mpi/mpijob_controller.go @@ -152,7 +152,7 @@ func (r *MPIJobReconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl. // Set default properties for tensorflow job. r.scheme.Default(mpiJob) - result, err = r.ctrl.ReconcileJobs(mpiJob, mpiJob.Spec.MPIReplicaSpecs, mpiJob.Status, &mpiJob.Spec.RunPolicy, nil, nil) + result, err = r.ctrl.ReconcileJobs(mpiJob, mpiJob.Spec.MPIReplicaSpecs, mpiJob.Status, &mpiJob.Spec.RunPolicy, nil, nil, mpiJob.Spec.NetworkMode) if err != nil { log.Error(err, "mpi job reconcile failed") return result, err diff --git a/controllers/pytorch/pytorchjob_controller.go b/controllers/pytorch/pytorchjob_controller.go index e3d58ff0..006d04ea 100644 --- a/controllers/pytorch/pytorchjob_controller.go +++ b/controllers/pytorch/pytorchjob_controller.go @@ -145,7 +145,7 @@ func (r *PytorchJobReconciler) Reconcile(_ context.Context, req ctrl.Request) (c // Set default properties for pytorch job. r.scheme.Default(pytorchJob) - result, err := r.ctrl.ReconcileJobs(pytorchJob, pytorchJob.Spec.PyTorchReplicaSpecs, pytorchJob.Status, &pytorchJob.Spec.RunPolicy, pytorchJob.Spec.ModelVersion, pytorchJob.Spec.CacheBackend) + result, err := r.ctrl.ReconcileJobs(pytorchJob, pytorchJob.Spec.PyTorchReplicaSpecs, pytorchJob.Status, &pytorchJob.Spec.RunPolicy, pytorchJob.Spec.ModelVersion, pytorchJob.Spec.CacheBackend, pytorchJob.Spec.NetworkMode) if err != nil { log.Error(err, "pytorch job reconcile failed") return result, err @@ -241,7 +241,7 @@ func (r *PytorchJobReconciler) SetClusterSpec(ctx context.Context, job interface } masterRole := rtype == strings.ToLower(string(training.PyTorchReplicaTypeMaster)) - if masterHostPort, ok := job_controller.GetHostNetworkPortFromContext(ctx, "master", "0"); job_controller.EnableHostNetwork(pytorchJob) && ok { + if masterHostPort, ok := job_controller.GetHostNetworkPortFromContext(ctx, "master", "0"); job_controller.EnableHostNetwork(pytorchJob.Spec.NetworkMode) && ok { if masterRole || features.KubeDLFeatureGates.Enabled(features.HostNetWithHeadlessSvc) { masterPort = masterHostPort } diff --git a/controllers/tensorflow/tensorflow.go b/controllers/tensorflow/tensorflow.go index 5682b27d..2e497b0c 100644 --- a/controllers/tensorflow/tensorflow.go +++ b/controllers/tensorflow/tensorflow.go @@ -135,7 +135,7 @@ func genClusterSpec(ctx context.Context, tfJob *training.TFJob, selfType, selfIn selfPort := port // Set endpoint port as selected hostnetwork port so that tensorflow worker process could listen // to correct port by TF_CONFIG[cluster]. - if job_controller.EnableHostNetwork(tfJob) && rt == selfType && strconv.Itoa(int(i)) == selfIndex { + if job_controller.EnableHostNetwork(tfJob.Spec.NetworkMode) && rt == selfType && strconv.Itoa(int(i)) == selfIndex { hostPort, ok := job_controller.GetHostNetworkPortFromContext(ctx, selfType, selfIndex) if ok { selfPort = hostPort diff --git a/controllers/tensorflow/tfjob_controller.go b/controllers/tensorflow/tfjob_controller.go index de891168..05d6397f 100644 --- a/controllers/tensorflow/tfjob_controller.go +++ b/controllers/tensorflow/tfjob_controller.go @@ -171,7 +171,7 @@ func (r *TFJobReconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.R // Set default properties for tensorflow job. r.scheme.Default(tfJob) - result, err := r.ctrl.ReconcileJobs(tfJob, tfJob.Spec.TFReplicaSpecs, tfJob.Status, &tfJob.Spec.RunPolicy, tfJob.Spec.ModelVersion, tfJob.Spec.CacheBackend) + result, err := r.ctrl.ReconcileJobs(tfJob, tfJob.Spec.TFReplicaSpecs, tfJob.Status, &tfJob.Spec.RunPolicy, tfJob.Spec.ModelVersion, tfJob.Spec.CacheBackend, tfJob.Spec.NetworkMode) if err != nil { log.Error(err, "tensorflow job reconcile failed") return result, err diff --git a/controllers/xdl/xdljob_controller.go b/controllers/xdl/xdljob_controller.go index 452e8d74..ca12364c 100644 --- a/controllers/xdl/xdljob_controller.go +++ b/controllers/xdl/xdljob_controller.go @@ -131,7 +131,7 @@ func (r *XDLJobReconciler) Reconcile(_ context.Context, request reconcile.Reques // Set default properties for xdl job. r.scheme.Default(xdlJob) - result, err := r.ctrl.ReconcileJobs(xdlJob, xdlJob.Spec.XDLReplicaSpecs, xdlJob.Status, &xdlJob.Spec.RunPolicy, nil, nil) + result, err := r.ctrl.ReconcileJobs(xdlJob, xdlJob.Spec.XDLReplicaSpecs, xdlJob.Status, &xdlJob.Spec.RunPolicy, nil, nil, xdlJob.Spec.NetworkMode) if err != nil { log.Error(err, "xdl job reconcile failed.") return result, err diff --git a/controllers/xgboost/xgboostjob_controller.go b/controllers/xgboost/xgboostjob_controller.go index 4922ba7f..b1dfa28a 100644 --- a/controllers/xgboost/xgboostjob_controller.go +++ b/controllers/xgboost/xgboostjob_controller.go @@ -118,7 +118,7 @@ func (r *XgboostJobReconciler) Reconcile(_ context.Context, req reconcile.Reques // Set default properties for xgboost job r.scheme.Default(xgboostjob) - result, err := r.ctrl.ReconcileJobs(xgboostjob, xgboostjob.Spec.XGBReplicaSpecs, xgboostjob.Status.JobStatus, &xgboostjob.Spec.RunPolicy, nil, nil) + result, err := r.ctrl.ReconcileJobs(xgboostjob, xgboostjob.Spec.XGBReplicaSpecs, xgboostjob.Status.JobStatus, &xgboostjob.Spec.RunPolicy, nil, nil, xgboostjob.Spec.NetworkMode) if err != nil { log.Error(err, "xgboost job reconcile failed") return result, err diff --git a/pkg/job_controller/api/v1/constants.go b/pkg/job_controller/api/v1/constants.go index d60a77dd..d88f78cc 100644 --- a/pkg/job_controller/api/v1/constants.go +++ b/pkg/job_controller/api/v1/constants.go @@ -30,8 +30,6 @@ const ( AnnotationGitSyncConfig = KubeDLPrefix + "/git-sync-config" // AnnotationTenancyInfo annotate tenancy information. AnnotationTenancyInfo = KubeDLPrefix + "/tenancy" - // AnnotationNetworkMode annotate job network mode. - AnnotationNetworkMode = KubeDLPrefix + "/network-mode" // AnnotationEnableElasticTraining indicates job enables elastic training. AnnotationEnableElasticTraining = KubeDLPrefix + "/enable-elastic-training" // AnnotationElasticScaleState indicates current progress of elastic scaling (inflight | done) @@ -75,14 +73,6 @@ const ( JobReplicaTypeAIMaster ReplicaType = "AIMaster" ) -// NetworkMode defines network mode for intra job communicating. -type NetworkMode string - -const ( - // HostNetworkMode indicates that replicas use host-network to communicate with each other. - HostNetworkMode NetworkMode = "host" -) - const ( ElasticScaleInflight = "inflight" ElasticScaleDone = "done" diff --git a/pkg/job_controller/api/v1/interface.go b/pkg/job_controller/api/v1/interface.go index 93b969f3..23da1482 100644 --- a/pkg/job_controller/api/v1/interface.go +++ b/pkg/job_controller/api/v1/interface.go @@ -9,6 +9,7 @@ import ( ) // ControllerInterface defines the Interface to be implemented by custom operators. e.g. tf-operator needs to implement this interface +// +k8s:deepcopy-gen=false type ControllerInterface interface { //ControllerName Returns the Controller name ControllerName() string @@ -72,6 +73,7 @@ type ControllerInterface interface { } // ElasticScaling defines the interface to be implemented by custom workload elastic behaviors. +// +k8s:deepcopy-gen=false type ElasticScaling interface { // EnableElasticScaling indicates workload enables elastic scaling or not. EnableElasticScaling(job v1.Object, runPolicy *RunPolicy) bool diff --git a/pkg/job_controller/api/v1/types.go b/pkg/job_controller/api/v1/types.go index 2ac1acb7..2323280c 100644 --- a/pkg/job_controller/api/v1/types.go +++ b/pkg/job_controller/api/v1/types.go @@ -312,3 +312,11 @@ type DAGCondition struct { // OnPhase defines at which phase the upstream replica will trigger this condition. OnPhase v1.PodPhase `json:"onPhase"` } + +// NetworkMode defines network mode for intra job communicating. +type NetworkMode string + +const ( + // HostNetworkMode indicates that replicas use host-network to communicate with each other. + HostNetworkMode NetworkMode = "host" +) diff --git a/pkg/job_controller/hostnetwork.go b/pkg/job_controller/hostnetwork.go index bbf7736a..056cbdd2 100644 --- a/pkg/job_controller/hostnetwork.go +++ b/pkg/job_controller/hostnetwork.go @@ -20,15 +20,13 @@ import ( "context" "fmt" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" "github.com/alibaba/kubedl/pkg/util" + corev1 "k8s.io/api/core/v1" ) -func EnableHostNetwork(job metav1.Object) bool { - return job.GetAnnotations()[v1.AnnotationNetworkMode] == string(v1.HostNetworkMode) +func EnableHostNetwork(networkmode *v1.NetworkMode) bool { + return *networkmode == v1.HostNetworkMode } func GetHostNetworkPortFromContext(ctx context.Context, rtype, index string) (int32, bool) { diff --git a/pkg/job_controller/job.go b/pkg/job_controller/job.go index 4df4811c..b862d548 100644 --- a/pkg/job_controller/job.go +++ b/pkg/job_controller/job.go @@ -70,7 +70,7 @@ func (jc *JobController) deletePodsAndServices(runPolicy *apiv1.RunPolicy, job i // ReconcileJobs checks and updates replicas for each given ReplicaSpec. // It will requeue the job in case of an error while creating/deleting pods/services. func (jc *JobController) ReconcileJobs(job client.Object, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, jobStatus apiv1.JobStatus, - runPolicy *apiv1.RunPolicy, modelVersion *v1alpha1.ModelVersionSpec, cacheBackendSpec *cachev1alpha1.CacheBackendSpec) (result reconcile.Result, err error) { + runPolicy *apiv1.RunPolicy, modelVersion *v1alpha1.ModelVersionSpec, cacheBackendSpec *cachev1alpha1.CacheBackendSpec, networkmode *apiv1.NetworkMode) (result reconcile.Result, err error) { jobName := job.GetName() jobKey, err := KeyFunc(job) @@ -307,7 +307,7 @@ func (jc *JobController) ReconcileJobs(job client.Object, replicas map[apiv1.Rep continue } - err = jc.ReconcilePods(ctx, job, &jobStatus, pods, rtype, spec, replicas, runPolicy, &restart) + err = jc.ReconcilePods(ctx, job, &jobStatus, pods, rtype, spec, replicas, runPolicy, networkmode, &restart) if err != nil { log.Warnf("ReconcilePods error %v", err) return result, err @@ -317,7 +317,7 @@ func (jc *JobController) ReconcileJobs(job client.Object, replicas map[apiv1.Rep continue } - err = jc.ReconcileServices(ctx, job, services, rtype, spec) + err = jc.ReconcileServices(ctx, job, services, rtype, spec, networkmode) if err != nil { log.Warnf("ReconcileServices error %v", err) return result, err diff --git a/pkg/job_controller/pod.go b/pkg/job_controller/pod.go index 9e9d5f54..f57a68d5 100644 --- a/pkg/job_controller/pod.go +++ b/pkg/job_controller/pod.go @@ -243,7 +243,7 @@ func (jc *JobController) ReconcilePods( pods []*v1.Pod, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec, - replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, runPolicy *apiv1.RunPolicy, restart *bool) error { + replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, runPolicy *apiv1.RunPolicy, mode *apiv1.NetworkMode, restart *bool) error { // Convert ReplicaType to lower string. rt := strings.ToLower(string(rtype)) @@ -278,7 +278,7 @@ func (jc *JobController) ReconcilePods( logger.Infof("Need to create new pod: %s-%d", rt, index) // check if this replica is the master role - err = jc.createNewPod(ctx, job, rt, strconv.Itoa(index), spec, jc.Controller.IsMasterRole(replicas, rtype, index), runPolicy) + err = jc.createNewPod(ctx, job, rt, strconv.Itoa(index), spec, jc.Controller.IsMasterRole(replicas, rtype, index), runPolicy, mode) if err != nil { // When controller tries to create a new pod but api-server returns a AlreadyExists error, // there may comes with two case: @@ -311,7 +311,7 @@ func (jc *JobController) ReconcilePods( // Check the status of the current pod. pod := podSlice[0] - failOver, exitCode, err := jc.reconcileOnePod(ctx, job, jobStatus, spec, pod, index, numReplicas, rtype, logger) + failOver, exitCode, err := jc.reconcileOnePod(ctx, job, jobStatus, spec, pod, index, numReplicas, rtype, logger, mode) if failOver { podsToFailover = append(podsToFailover, pod) } else if pod.Status.Phase == v1.PodFailed { @@ -353,7 +353,7 @@ func (jc *JobController) ReconcilePods( } func (jc *JobController) reconcileOnePod(ctx context.Context, job client.Object, jobStatus *apiv1.JobStatus, - spec *apiv1.ReplicaSpec, pod *v1.Pod, index, numReplicas int, rtype apiv1.ReplicaType, logger *log.Entry) (failOver bool, exitCode int32, err error) { + spec *apiv1.ReplicaSpec, pod *v1.Pod, index, numReplicas int, rtype apiv1.ReplicaType, logger *log.Entry, mode *apiv1.NetworkMode) (failOver bool, exitCode int32, err error) { const ( // magic number initialExitCode int32 = 0xbeef @@ -381,7 +381,7 @@ func (jc *JobController) reconcileOnePod(ctx context.Context, job client.Object, } // Get and pass its container port by context if pod enables hostnetwork mode. - if EnableHostNetwork(job) { + if EnableHostNetwork(mode) { storeHostNetworkPortToContext(ctx, strings.ToLower(string(rtype)), strconv.Itoa(index), getContainerHostNetworkPort(pod, jc.Controller.GetDefaultContainerName(), jc.Controller.GetDefaultContainerPortName())) } @@ -401,7 +401,7 @@ func (jc *JobController) reconcileOnePod(ctx context.Context, job client.Object, // createNewPod creates a new pod for the given index and type. func (jc *JobController) createNewPod(ctx context.Context, job interface{}, rt, index string, spec *apiv1.ReplicaSpec, masterRole bool, - runPolicy *apiv1.RunPolicy) error { + runPolicy *apiv1.RunPolicy, mode *apiv1.NetworkMode) error { metaObject, ok := job.(metav1.Object) if !ok { @@ -429,7 +429,7 @@ func (jc *JobController) createNewPod(ctx context.Context, job interface{}, rt, labels[apiv1.LabelGeneration] = strconv.Itoa(int(metaObject.GetGeneration())) } - if EnableHostNetwork(metaObject) { + if EnableHostNetwork(mode) { commonutil.LoggerForReplica(metaObject, rt).Infof("pod enable host network, name: %s, masterRole: %v", metaObject.GetName(), masterRole) if err := jc.setupHostNetwork(ctx, podTemplate, rt, index); err != nil { diff --git a/pkg/job_controller/pod_test.go b/pkg/job_controller/pod_test.go index a967d9c9..06f3e960 100644 --- a/pkg/job_controller/pod_test.go +++ b/pkg/job_controller/pod_test.go @@ -146,6 +146,7 @@ func TestReconcilePods(t *testing.T) { job *testjobv1.TestJob pods []*v1.Pod rtype apiv1.ReplicaType + mode *apiv1.NetworkMode expectedPodsNum int }{ { @@ -248,7 +249,7 @@ func TestReconcilePods(t *testing.T) { } err := jc.ReconcilePods(context.Background(), c.job, &c.job.Status, c.pods, c.rtype, c.job.Spec.TestReplicaSpecs[c.rtype], - c.job.Spec.TestReplicaSpecs, &apiv1.RunPolicy{}, pointer.BoolPtr(false)) + c.job.Spec.TestReplicaSpecs, &apiv1.RunPolicy{}, c.mode, pointer.BoolPtr(false)) if err != nil { t.Errorf("failed to ReconcilePods, err: %v", err) } diff --git a/pkg/job_controller/service.go b/pkg/job_controller/service.go index f97b3d9c..d7a2af5b 100644 --- a/pkg/job_controller/service.go +++ b/pkg/job_controller/service.go @@ -199,7 +199,8 @@ func (jc *JobController) ReconcileServices( job metav1.Object, services []*v1.Service, rtype apiv1.ReplicaType, - spec *apiv1.ReplicaSpec) error { + spec *apiv1.ReplicaSpec, + networkmode *apiv1.NetworkMode) error { // Convert ReplicaType to lower string. rt := strings.ToLower(string(rtype)) @@ -218,7 +219,7 @@ func (jc *JobController) ReconcileServices( commonutil.LoggerForReplica(job, rt).Warningf("we have too many services for %s %d", rt, index) } else if len(serviceSlice) == 0 { commonutil.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index) - err = jc.CreateNewService(ctx, job, rtype, spec, strconv.Itoa(index)) + err = jc.CreateNewService(ctx, job, rtype, spec, strconv.Itoa(index), networkmode) if err != nil { return err } @@ -235,7 +236,7 @@ func (jc *JobController) ReconcileServices( } } } - if EnableHostNetwork(job) { + if EnableHostNetwork(networkmode) { hostPort, ok := GetHostNetworkPortFromContext(ctx, rt, strconv.Itoa(index)) if ok && len(service.Spec.Ports) > 0 && service.Spec.Ports[0].TargetPort.IntVal != hostPort { commonutil.LoggerForReplica(job, rt).Infof("update target service: %s-%d, new port: %d", @@ -274,7 +275,7 @@ func (jc *JobController) GetPortFromJob(spec *apiv1.ReplicaSpec) (int32, error) // createNewService creates a new service for the given index and type. func (jc *JobController) CreateNewService(ctx context.Context, job metav1.Object, rtype apiv1.ReplicaType, - spec *apiv1.ReplicaSpec, index string) error { + spec *apiv1.ReplicaSpec, index string, networkmode *apiv1.NetworkMode) error { // Convert ReplicaType to lower string. rt := strings.ToLower(string(rtype)) @@ -291,7 +292,7 @@ func (jc *JobController) CreateNewService(ctx context.Context, job metav1.Object targetPort := svcPort clusterIP := "None" - if !features.KubeDLFeatureGates.Enabled(features.HostNetWithHeadlessSvc) && EnableHostNetwork(job) { + if !features.KubeDLFeatureGates.Enabled(features.HostNetWithHeadlessSvc) && EnableHostNetwork(networkmode) { // Communications between replicas use headless services by default, as for hostnetwork mode, // headless service can not forward traffic from one port to another, so we use normal service // when hostnetwork enabled. diff --git a/pkg/job_controller/test_job_controller.go b/pkg/job_controller/test_job_controller.go index d0305569..ab0d3887 100644 --- a/pkg/job_controller/test_job_controller.go +++ b/pkg/job_controller/test_job_controller.go @@ -32,7 +32,6 @@ func (t TestJobController) GetReconcileOrders() []apiv1.ReplicaType { testv1.TestReplicaTypeWorker, } } - func (t TestJobController) GetPodsForJob(job interface{}) ([]*corev1.Pod, error) { return []*corev1.Pod{}, nil } diff --git a/pkg/test_job/v1/types.go b/pkg/test_job/v1/types.go index 9f06cdc6..e3716445 100644 --- a/pkg/test_job/v1/types.go +++ b/pkg/test_job/v1/types.go @@ -53,6 +53,7 @@ type TestJobSpec struct { const ( TestReplicaTypeWorker apiv1.ReplicaType = "Worker" TestReplicaTypeMaster apiv1.ReplicaType = "Master" + Testmode apiv1.NetworkMode = "host" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object