diff --git a/pkg/apis/numaflow/v1alpha1/container_builder.go b/pkg/apis/numaflow/v1alpha1/container_builder.go index df09d250c..0751239d6 100644 --- a/pkg/apis/numaflow/v1alpha1/container_builder.go +++ b/pkg/apis/numaflow/v1alpha1/container_builder.go @@ -16,7 +16,10 @@ limitations under the License. package v1alpha1 -import corev1 "k8s.io/api/core/v1" +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" +) type containerBuilder corev1.Container @@ -85,6 +88,11 @@ func (b containerBuilder) resources(x corev1.ResourceRequirements) containerBuil return b } +func (b containerBuilder) asSidecar() containerBuilder { + b.RestartPolicy = ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways) + return b +} + func (b containerBuilder) build() corev1.Container { return corev1.Container(b) } diff --git a/pkg/apis/numaflow/v1alpha1/container_builder_test.go b/pkg/apis/numaflow/v1alpha1/container_builder_test.go index b641918b6..22a04b4ec 100644 --- a/pkg/apis/numaflow/v1alpha1/container_builder_test.go +++ b/pkg/apis/numaflow/v1alpha1/container_builder_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/ptr" ) var ( @@ -49,6 +50,7 @@ func Test_containerBuilder(t *testing.T) { appendEnv(corev1.EnvVar{ Name: "env", Value: "value"}). appendPorts(corev1.ContainerPort{Name: "port", ContainerPort: 8080}). + asSidecar(). build() assert.Equal(t, "numa", c.Name) assert.Len(t, c.VolumeMounts, 1) @@ -58,4 +60,5 @@ func Test_containerBuilder(t *testing.T) { assert.Equal(t, corev1.PullIfNotPresent, c.ImagePullPolicy) assert.Equal(t, []corev1.EnvVar{{Name: "env", Value: "value"}}, c.Env) assert.Equal(t, []corev1.ContainerPort{{Name: "port", ContainerPort: 8080}}, c.Ports) + assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy) } diff --git a/pkg/apis/numaflow/v1alpha1/container_supplier.go b/pkg/apis/numaflow/v1alpha1/container_supplier.go index 3d090c62f..a15aee8ec 100644 --- a/pkg/apis/numaflow/v1alpha1/container_supplier.go +++ b/pkg/apis/numaflow/v1alpha1/container_supplier.go @@ -29,5 +29,6 @@ type getContainerReq struct { } type containerSupplier interface { - getContainers(req getContainerReq) ([]corev1.Container, error) + // getContainers returns the sidecar containers and main containers for the vertex. + getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) } diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index 0ed073050..a167b56cc 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -341,7 +341,7 @@ func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, e } volumeMounts := []corev1.VolumeMount{{Name: varVolumeName, MountPath: PathVarRun}} - containers := mv.Spec.buildContainers(getContainerReq{ + sidecarContainers, containers := mv.Spec.buildContainers(getContainerReq{ env: envVars, image: req.Image, imagePullPolicy: req.PullPolicy, @@ -391,17 +391,19 @@ func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, e {Name: MonoVertexMetricsPortName, ContainerPort: MonoVertexMetricsPort}, } - if len(containers) > 1 { // udf, udsink, udsource, or source vertex specifies a udtransformer - for i := 1; i < len(containers); i++ { - containers[i].Env = append(containers[i].Env, mv.commonEnvs()...) - containers[i].Env = append(containers[i].Env, mv.sidecarEnvs()...) - } + for i := 0; i < len(sidecarContainers); i++ { // udsink, udsource, udtransformer ... + sidecarContainers[i].Env = append(sidecarContainers[i].Env, mv.commonEnvs()...) + sidecarContainers[i].Env = append(sidecarContainers[i].Env, mv.sidecarEnvs()...) } + initContainers := []corev1.Container{} + initContainers = append(initContainers, mv.Spec.InitContainers...) + initContainers = append(initContainers, sidecarContainers...) + spec := &corev1.PodSpec{ Subdomain: mv.GetHeadlessServiceName(), Volumes: append(volumes, mv.Spec.Volumes...), - InitContainers: mv.Spec.InitContainers, + InitContainers: initContainers, Containers: append(containers, mv.Spec.Sidecars...), } mv.Spec.AbstractPodTemplate.ApplyToPodSpec(spec) @@ -458,26 +460,28 @@ func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec { return x } -func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) []corev1.Container { +// buildContainers builds the sidecar containers and main containers for the mono vertex. +func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) ([]corev1.Container, []corev1.Container) { mainContainer := containerBuilder{}. init(req).command(NumaflowRustBinary).args("--rust").build() - containers := []corev1.Container{mainContainer} + + sidecarContainers := []corev1.Container{} if mvspec.Source.UDSource != nil { // Only support UDSource for now. - containers = append(containers, mvspec.Source.getUDSourceContainer(req)) + sidecarContainers = append(sidecarContainers, mvspec.Source.getUDSourceContainer(req)) } if mvspec.Source.UDTransformer != nil { - containers = append(containers, mvspec.Source.getUDTransformerContainer(req)) + sidecarContainers = append(sidecarContainers, mvspec.Source.getUDTransformerContainer(req)) } if mvspec.Sink.UDSink != nil { // Only support UDSink for now. - containers = append(containers, mvspec.Sink.getUDSinkContainer(req)) + sidecarContainers = append(sidecarContainers, mvspec.Sink.getUDSinkContainer(req)) } if mvspec.Sink.Fallback != nil { - containers = append(containers, mvspec.Sink.getFallbackUDSinkContainer(req)) + sidecarContainers = append(sidecarContainers, mvspec.Sink.getFallbackUDSinkContainer(req)) } // Fallback sink is not supported. - containers = append(containers, mvspec.Sidecars...) - return containers + sidecarContainers = append(sidecarContainers, mvspec.Sidecars...) + return sidecarContainers, containers } type MonoVertexLimits struct { diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go index 98bb80100..db7bed1fb 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go @@ -205,7 +205,8 @@ func TestMonoVertexGetPodSpec(t *testing.T) { } podSpec, err := testMvtx.GetPodSpec(req) assert.NoError(t, err) - assert.Equal(t, 4, len(podSpec.Containers)) + assert.Equal(t, 1, len(podSpec.Containers)) + assert.Equal(t, 3, len(podSpec.InitContainers)) assert.Equal(t, 1, len(podSpec.Volumes)) assert.Equal(t, "my-image", podSpec.Containers[0].Image) assert.Equal(t, corev1.PullIfNotPresent, podSpec.Containers[0].ImagePullPolicy) @@ -213,12 +214,15 @@ func TestMonoVertexGetPodSpec(t *testing.T) { assert.Equal(t, "200m", podSpec.Containers[0].Resources.Limits.Cpu().String()) assert.Equal(t, "100Mi", podSpec.Containers[0].Resources.Requests.Memory().String()) assert.Equal(t, "200Mi", podSpec.Containers[0].Resources.Limits.Memory().String()) - assert.Equal(t, "test-image1", podSpec.Containers[1].Image) - assert.Equal(t, "test-image2", podSpec.Containers[2].Image) - assert.Equal(t, "test-image3", podSpec.Containers[3].Image) + assert.Equal(t, "test-image1", podSpec.InitContainers[0].Image) + assert.Equal(t, "test-image2", podSpec.InitContainers[1].Image) + assert.Equal(t, "test-image3", podSpec.InitContainers[2].Image) for _, c := range podSpec.Containers { assert.Equal(t, 1, len(c.VolumeMounts)) } + for _, c := range podSpec.InitContainers { + assert.Equal(t, 1, len(c.VolumeMounts)) + } envNames := []string{} for _, env := range podSpec.Containers[0].Env { envNames = append(envNames, env.Name) diff --git a/pkg/apis/numaflow/v1alpha1/sink.go b/pkg/apis/numaflow/v1alpha1/sink.go index 1b37ef5dc..dc66593e0 100644 --- a/pkg/apis/numaflow/v1alpha1/sink.go +++ b/pkg/apis/numaflow/v1alpha1/sink.go @@ -48,17 +48,18 @@ type AbstractSink struct { UDSink *UDSink `json:"udsink,omitempty" protobuf:"bytes,4,opt,name=udsink"` } -func (s Sink) getContainers(req getContainerReq) ([]corev1.Container, error) { +func (s Sink) getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) { containers := []corev1.Container{ s.getMainContainer(req), } + sidecarContainers := []corev1.Container{} if s.UDSink != nil { - containers = append(containers, s.getUDSinkContainer(req)) + sidecarContainers = append(sidecarContainers, s.getUDSinkContainer(req)) } if s.Fallback != nil && s.Fallback.UDSink != nil { - containers = append(containers, s.getFallbackUDSinkContainer(req)) + sidecarContainers = append(sidecarContainers, s.getFallbackUDSinkContainer(req)) } - return containers, nil + return sidecarContainers, containers, nil } func (s Sink) getMainContainer(req getContainerReq) corev1.Container { @@ -72,7 +73,7 @@ func (s Sink) getUDSinkContainer(mainContainerReq getContainerReq) corev1.Contai c := containerBuilder{}. name(CtrUdsink). imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container - appendVolumeMounts(mainContainerReq.volumeMounts...) + appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar() x := s.UDSink.Container c = c.image(x.Image) if len(x.Command) > 0 { @@ -107,7 +108,7 @@ func (s Sink) getFallbackUDSinkContainer(mainContainerReq getContainerReq) corev c := containerBuilder{}. name(CtrFallbackUdsink). imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container - appendVolumeMounts(mainContainerReq.volumeMounts...) + appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar() x := s.Fallback.UDSink.Container c = c.image(x.Image) if len(x.Command) > 0 { diff --git a/pkg/apis/numaflow/v1alpha1/sink_test.go b/pkg/apis/numaflow/v1alpha1/sink_test.go index f63cb751b..b15f9a166 100644 --- a/pkg/apis/numaflow/v1alpha1/sink_test.go +++ b/pkg/apis/numaflow/v1alpha1/sink_test.go @@ -27,7 +27,7 @@ import ( func Test_Sink_getContainers(t *testing.T) { s := Sink{} - c, err := s.getContainers(getContainerReq{ + sc, c, err := s.getContainers(getContainerReq{ env: []corev1.EnvVar{ {Name: "test-env", Value: "test-val"}, }, @@ -37,6 +37,7 @@ func Test_Sink_getContainers(t *testing.T) { resources: corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, }) assert.NoError(t, err) + assert.Equal(t, 0, len(sc)) assert.Equal(t, 1, len(c)) assert.Equal(t, testFlowImage, c[0].Image) assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[0].Resources) @@ -89,6 +90,7 @@ func Test_Sink_getUDSinkContainer(t *testing.T) { assert.Equal(t, int32(15), c.LivenessProbe.TimeoutSeconds) assert.Equal(t, int32(14), c.LivenessProbe.PeriodSeconds) assert.Equal(t, int32(5), c.LivenessProbe.FailureThreshold) + assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy) } func Test_Sink_getFallbackUDSinkContainer(t *testing.T) { @@ -151,4 +153,5 @@ func Test_Sink_getFallbackUDSinkContainer(t *testing.T) { }) assert.Equal(t, testImagePullPolicy, c.ImagePullPolicy) assert.True(t, c.LivenessProbe != nil) + assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy) } diff --git a/pkg/apis/numaflow/v1alpha1/source.go b/pkg/apis/numaflow/v1alpha1/source.go index c59d4c3c2..e1308cf28 100644 --- a/pkg/apis/numaflow/v1alpha1/source.go +++ b/pkg/apis/numaflow/v1alpha1/source.go @@ -45,17 +45,18 @@ type Source struct { Serving *ServingSource `json:"serving,omitempty" protobuf:"bytes,8,opt,name=serving"` } -func (s Source) getContainers(req getContainerReq) ([]corev1.Container, error) { +func (s Source) getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) { containers := []corev1.Container{ s.getMainContainer(req), } + sidecarContainers := []corev1.Container{} if s.UDTransformer != nil { - containers = append(containers, s.getUDTransformerContainer(req)) + sidecarContainers = append(sidecarContainers, s.getUDTransformerContainer(req)) } if s.UDSource != nil { - containers = append(containers, s.getUDSourceContainer(req)) + sidecarContainers = append(sidecarContainers, s.getUDSourceContainer(req)) } - return containers, nil + return sidecarContainers, containers, nil } func (s Source) getMainContainer(req getContainerReq) corev1.Container { @@ -69,7 +70,7 @@ func (s Source) getUDTransformerContainer(mainContainerReq getContainerReq) core c := containerBuilder{}. name(CtrUdtransformer). imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container - appendVolumeMounts(mainContainerReq.volumeMounts...) + appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar() c = c.appendEnv(corev1.EnvVar{Name: EnvUDContainerType, Value: UDContainerTransformer}) if x := s.UDTransformer.Container; x != nil && x.Image != "" { // customized image c = c.image(x.Image) @@ -133,7 +134,7 @@ func (s Source) getUDSourceContainer(mainContainerReq getContainerReq) corev1.Co c := containerBuilder{}. name(CtrUdsource). imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container - appendVolumeMounts(mainContainerReq.volumeMounts...) + appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar() c = c.appendEnv(corev1.EnvVar{Name: EnvUDContainerType, Value: UDContainerSource}) if x := s.UDSource.Container; x != nil && x.Image != "" { // customized image c = c.image(x.Image) diff --git a/pkg/apis/numaflow/v1alpha1/source_test.go b/pkg/apis/numaflow/v1alpha1/source_test.go index 01f26f2cd..9e816896a 100644 --- a/pkg/apis/numaflow/v1alpha1/source_test.go +++ b/pkg/apis/numaflow/v1alpha1/source_test.go @@ -77,60 +77,64 @@ func TestSource_getContainers(t *testing.T) { }, }, } - c, err := x.getContainers(getContainerReq{ + sc, c, err := x.getContainers(getContainerReq{ image: "main-image", }) assert.NoError(t, err) - assert.Equal(t, 3, len(c)) + assert.Equal(t, 2, len(sc)) + assert.Equal(t, 1, len(c)) assert.Equal(t, "main-image", c[0].Image) - assert.Equal(t, x.UDSource.Container.Image, c[2].Image) - assert.Contains(t, c[2].VolumeMounts, c[2].VolumeMounts[0]) - assert.Equal(t, x.UDSource.Container.Command, c[2].Command) - assert.Equal(t, x.UDSource.Container.Args, c[2].Args) + assert.Equal(t, x.UDSource.Container.Image, sc[1].Image) + assert.Contains(t, sc[1].VolumeMounts, sc[1].VolumeMounts[0]) + assert.Equal(t, x.UDSource.Container.Command, sc[1].Command) + assert.Equal(t, x.UDSource.Container.Args, sc[1].Args) envsUDSource := map[string]string{} - for _, e := range c[2].Env { + for _, e := range sc[1].Env { envsUDSource[e.Name] = e.Value } assert.Equal(t, envsUDSource[EnvUDContainerType], UDContainerSource) - assert.Equal(t, x.UDSource.Container.EnvFrom, c[2].EnvFrom) - assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[2].Resources) - assert.Equal(t, c[0].ImagePullPolicy, c[2].ImagePullPolicy) - assert.NotNil(t, c[1].LivenessProbe) - assert.Equal(t, int32(10), c[2].LivenessProbe.InitialDelaySeconds) - assert.Equal(t, int32(15), c[2].LivenessProbe.TimeoutSeconds) - assert.Equal(t, int32(14), c[2].LivenessProbe.PeriodSeconds) - assert.Equal(t, int32(5), c[2].LivenessProbe.FailureThreshold) + assert.Equal(t, x.UDSource.Container.EnvFrom, sc[1].EnvFrom) + assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, sc[1].Resources) + assert.Equal(t, c[0].ImagePullPolicy, sc[1].ImagePullPolicy) + assert.NotNil(t, sc[0].LivenessProbe) + assert.Equal(t, int32(10), sc[1].LivenessProbe.InitialDelaySeconds) + assert.Equal(t, int32(15), sc[1].LivenessProbe.TimeoutSeconds) + assert.Equal(t, int32(14), sc[1].LivenessProbe.PeriodSeconds) + assert.Equal(t, int32(5), sc[1].LivenessProbe.FailureThreshold) x.UDSource.Container.ImagePullPolicy = &testImagePullPolicy - c, _ = x.getContainers(getContainerReq{ + assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), sc[0].RestartPolicy) + assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), sc[1].RestartPolicy) + sc, c, _ = x.getContainers(getContainerReq{ image: "main-image", imagePullPolicy: corev1.PullAlways, }) - assert.Equal(t, testImagePullPolicy, c[2].ImagePullPolicy) + assert.Equal(t, testImagePullPolicy, sc[1].ImagePullPolicy) - assert.Equal(t, x.UDTransformer.Container.Image, c[1].Image) - assert.Contains(t, c[1].VolumeMounts, c[1].VolumeMounts[0]) - assert.Equal(t, x.UDTransformer.Container.Command, c[1].Command) - assert.Equal(t, x.UDTransformer.Container.Args, c[1].Args) + assert.Equal(t, x.UDTransformer.Container.Image, sc[0].Image) + assert.Contains(t, sc[0].VolumeMounts, sc[0].VolumeMounts[0]) + assert.Equal(t, x.UDTransformer.Container.Command, sc[0].Command) + assert.Equal(t, x.UDTransformer.Container.Args, sc[0].Args) envs := map[string]string{} - for _, e := range c[1].Env { + for _, e := range sc[0].Env { envs[e.Name] = e.Value } assert.Equal(t, envs[EnvUDContainerType], UDContainerTransformer) - assert.Equal(t, x.UDTransformer.Container.EnvFrom, c[1].EnvFrom) - assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[1].Resources) - assert.Equal(t, c[0].ImagePullPolicy, c[1].ImagePullPolicy) - assert.NotNil(t, c[1].LivenessProbe) - assert.Equal(t, int32(20), c[1].LivenessProbe.InitialDelaySeconds) - assert.Equal(t, int32(25), c[1].LivenessProbe.TimeoutSeconds) - assert.Equal(t, int32(24), c[1].LivenessProbe.PeriodSeconds) - assert.Equal(t, int32(5), c[1].LivenessProbe.FailureThreshold) + assert.Equal(t, x.UDTransformer.Container.EnvFrom, sc[0].EnvFrom) + assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, sc[0].Resources) + assert.Equal(t, c[0].ImagePullPolicy, sc[0].ImagePullPolicy) + assert.NotNil(t, sc[0].LivenessProbe) + assert.Equal(t, int32(20), sc[0].LivenessProbe.InitialDelaySeconds) + assert.Equal(t, int32(25), sc[0].LivenessProbe.TimeoutSeconds) + assert.Equal(t, int32(24), sc[0].LivenessProbe.PeriodSeconds) + assert.Equal(t, int32(5), sc[0].LivenessProbe.FailureThreshold) x.UDTransformer.Container.ImagePullPolicy = &testImagePullPolicy - c, _ = x.getContainers(getContainerReq{ + sc, c, _ = x.getContainers(getContainerReq{ image: "main-image", imagePullPolicy: corev1.PullAlways, }) - assert.Equal(t, testImagePullPolicy, c[1].ImagePullPolicy) + assert.Equal(t, corev1.PullAlways, c[0].ImagePullPolicy) + assert.Equal(t, testImagePullPolicy, sc[0].ImagePullPolicy) } func Test_getTransformerContainer(t *testing.T) { diff --git a/pkg/apis/numaflow/v1alpha1/udf.go b/pkg/apis/numaflow/v1alpha1/udf.go index 6f5f18ff4..573ddcbca 100644 --- a/pkg/apis/numaflow/v1alpha1/udf.go +++ b/pkg/apis/numaflow/v1alpha1/udf.go @@ -45,8 +45,8 @@ type UDF struct { GroupBy *GroupBy `json:"groupBy" protobuf:"bytes,3,opt,name=groupBy"` } -func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, error) { - return []corev1.Container{in.getMainContainer(req), in.getUDFContainer(req)}, nil +func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) { + return []corev1.Container{in.getUDFContainer(req)}, []corev1.Container{in.getMainContainer(req)}, nil } func (in UDF) getMainContainer(req getContainerReq) corev1.Container { @@ -63,7 +63,7 @@ func (in UDF) getUDFContainer(mainContainerReq getContainerReq) corev1.Container c := containerBuilder{}. name(CtrUdf). imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as main container - appendVolumeMounts(mainContainerReq.volumeMounts...) + appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar() if x := in.Container; x != nil && x.Image != "" { // customized image c = c.image(x.Image) if len(x.Command) > 0 { diff --git a/pkg/apis/numaflow/v1alpha1/udf_test.go b/pkg/apis/numaflow/v1alpha1/udf_test.go index 0bb8e3862..79eea01fc 100644 --- a/pkg/apis/numaflow/v1alpha1/udf_test.go +++ b/pkg/apis/numaflow/v1alpha1/udf_test.go @@ -49,36 +49,39 @@ func TestUDF_getContainers(t *testing.T) { }, }, } - c, err := x.getContainers(getContainerReq{ + sc, c, err := x.getContainers(getContainerReq{ image: "main-image", imagePullPolicy: corev1.PullAlways, }) assert.NoError(t, err) - assert.Equal(t, 2, len(c)) + assert.Equal(t, 1, len(c)) + assert.Equal(t, 1, len(sc)) assert.Equal(t, "main-image", c[0].Image) - assert.Equal(t, x.Container.Image, c[1].Image) - assert.Contains(t, c[1].VolumeMounts, c[1].VolumeMounts[0]) - assert.Equal(t, x.Container.Command, c[1].Command) - assert.Equal(t, x.Container.Args, c[1].Args) + assert.Equal(t, x.Container.Image, sc[0].Image) + assert.Contains(t, sc[0].VolumeMounts, sc[0].VolumeMounts[0]) + assert.Equal(t, x.Container.Command, sc[0].Command) + assert.Equal(t, x.Container.Args, sc[0].Args) envs := map[string]string{} - for _, e := range c[1].Env { + for _, e := range sc[0].Env { envs[e.Name] = e.Value } assert.Equal(t, envs[EnvUDContainerType], UDContainerFunction) - assert.Equal(t, 1, len(c[1].EnvFrom)) - assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[1].Resources) - assert.Equal(t, corev1.PullAlways, c[1].ImagePullPolicy) + assert.Equal(t, 1, len(sc[0].EnvFrom)) + assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, sc[0].Resources) + assert.Equal(t, corev1.PullAlways, sc[0].ImagePullPolicy) x.Container.ImagePullPolicy = &testImagePullPolicy - c, _ = x.getContainers(getContainerReq{ + sc, c, _ = x.getContainers(getContainerReq{ image: "main-image", imagePullPolicy: corev1.PullAlways, }) - assert.Equal(t, testImagePullPolicy, c[1].ImagePullPolicy) - assert.True(t, c[1].LivenessProbe != nil) - assert.Equal(t, int32(10), c[1].LivenessProbe.InitialDelaySeconds) - assert.Equal(t, int32(15), c[1].LivenessProbe.TimeoutSeconds) - assert.Equal(t, int32(14), c[1].LivenessProbe.PeriodSeconds) - assert.Equal(t, int32(5), c[1].LivenessProbe.FailureThreshold) + assert.Equal(t, 1, len(c)) + assert.Equal(t, 1, len(sc)) + assert.Equal(t, testImagePullPolicy, sc[0].ImagePullPolicy) + assert.True(t, sc[0].LivenessProbe != nil) + assert.Equal(t, int32(10), sc[0].LivenessProbe.InitialDelaySeconds) + assert.Equal(t, int32(15), sc[0].LivenessProbe.TimeoutSeconds) + assert.Equal(t, int32(14), sc[0].LivenessProbe.PeriodSeconds) + assert.Equal(t, int32(5), sc[0].LivenessProbe.FailureThreshold) } func Test_getUDFContainer(t *testing.T) { @@ -110,6 +113,7 @@ func Test_getUDFContainer(t *testing.T) { envs[e.Name] = e.Value } assert.Equal(t, envs[EnvUDContainerType], UDContainerFunction) + assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy) }) t.Run("with built-in functions", func(t *testing.T) { diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index e45c168bd..ccd7ecd5d 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -242,7 +242,7 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { } volumeMounts := []corev1.VolumeMount{{Name: varVolumeName, MountPath: PathVarRun}} executeRustBinary, _ := env.GetBool(EnvExecuteRustBinary, false) - containers, err := v.Spec.getType().getContainers(getContainerReq{ + sidecarContainers, containers, err := v.Spec.getType().getContainers(getContainerReq{ isbSvcType: req.ISBSvcType, env: envVars, image: req.Image, @@ -251,7 +251,6 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { volumeMounts: volumeMounts, executeRustBinary: executeRustBinary, }) - if err != nil { return nil, err } @@ -299,11 +298,9 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { {Name: VertexMetricsPortName, ContainerPort: VertexMetricsPort}, } - if len(containers) > 1 { // udf, udsink, udsource, or source vertex specifies a udtransformer - for i := 1; i < len(containers); i++ { - containers[i].Env = append(containers[i].Env, v.commonEnvs()...) - containers[i].Env = append(containers[i].Env, v.sidecarEnvs()...) - } + for i := 0; i < len(sidecarContainers); i++ { // udf, udsink, udsource, or source vertex specifies a udtransformer + sidecarContainers[i].Env = append(sidecarContainers[i].Env, v.commonEnvs()...) + sidecarContainers[i].Env = append(sidecarContainers[i].Env, v.sidecarEnvs()...) } initContainers := v.getInitContainers(req) @@ -327,19 +324,19 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { if x := v.Spec.SideInputsContainerTemplate; x != nil { x.ApplyToContainer(&sideInputsWatcher) } + sideInputsWatcher.VolumeMounts = append(sideInputsWatcher.VolumeMounts, corev1.VolumeMount{Name: sideInputsVolName, MountPath: PathSideInputsMount}) containers = append(containers, sideInputsWatcher) - for i := 1; i < len(containers); i++ { - if containers[i].Name == CtrSideInputsWatcher { - containers[i].VolumeMounts = append(containers[i].VolumeMounts, corev1.VolumeMount{Name: sideInputsVolName, MountPath: PathSideInputsMount}) - } else { - // Readonly mount for user-defined containers - containers[i].VolumeMounts = append(containers[i].VolumeMounts, corev1.VolumeMount{Name: sideInputsVolName, MountPath: PathSideInputsMount, ReadOnly: true}) - } + for i := 0; i < len(sidecarContainers); i++ { + // Readonly mount for user-defined containers + sidecarContainers[i].VolumeMounts = append(sidecarContainers[i].VolumeMounts, corev1.VolumeMount{Name: sideInputsVolName, MountPath: PathSideInputsMount, ReadOnly: true}) } // Side Inputs init container initContainers[1].VolumeMounts = append(initContainers[1].VolumeMounts, corev1.VolumeMount{Name: sideInputsVolName, MountPath: PathSideInputsMount}) } + // Add the sidecar containers + initContainers = append(initContainers, sidecarContainers...) + if v.IsASource() && v.Spec.Source.Serving != nil { servingContainer, err := v.getServingContainer(req) if err != nil { diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go index 83384cf62..fde691425 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go @@ -358,14 +358,15 @@ func TestGetPodSpec(t *testing.T) { } s, err := testObj.GetPodSpec(req) assert.NoError(t, err) - assert.Equal(t, 2, len(s.Containers)) - assert.Equal(t, "image", s.Containers[1].Image) - assert.Equal(t, 1, len(s.Containers[1].Command)) - assert.Equal(t, "cmd", s.Containers[1].Command[0]) - assert.Equal(t, 1, len(s.Containers[1].Args)) - assert.Equal(t, "arg0", s.Containers[1].Args[0]) + assert.Equal(t, 1, len(s.Containers)) + assert.Equal(t, 2, len(s.InitContainers)) + assert.Equal(t, "image", s.InitContainers[1].Image) + assert.Equal(t, 1, len(s.InitContainers[1].Command)) + assert.Equal(t, "cmd", s.InitContainers[1].Command[0]) + assert.Equal(t, 1, len(s.InitContainers[1].Args)) + assert.Equal(t, "arg0", s.InitContainers[1].Args[0]) var sidecarEnvNames []string - for _, env := range s.Containers[1].Env { + for _, env := range s.InitContainers[1].Env { sidecarEnvNames = append(sidecarEnvNames, env.Name) } assert.Contains(t, sidecarEnvNames, EnvCPULimit) @@ -394,16 +395,17 @@ func TestGetPodSpec(t *testing.T) { } s, err := testObj.GetPodSpec(req) assert.NoError(t, err) - assert.Equal(t, 3, len(s.Containers)) - - for i := 1; i < len(s.Containers); i++ { - assert.Equal(t, "image", s.Containers[i].Image) - assert.Equal(t, 1, len(s.Containers[i].Command)) - assert.Equal(t, "cmd", s.Containers[i].Command[0]) - assert.Equal(t, 1, len(s.Containers[i].Args)) - assert.Equal(t, "arg0", s.Containers[i].Args[0]) + assert.Equal(t, 1, len(s.Containers)) + assert.Equal(t, 3, len(s.InitContainers)) + + for i := 1; i < len(s.InitContainers); i++ { + assert.Equal(t, "image", s.InitContainers[i].Image) + assert.Equal(t, 1, len(s.InitContainers[i].Command)) + assert.Equal(t, "cmd", s.InitContainers[i].Command[0]) + assert.Equal(t, 1, len(s.InitContainers[i].Args)) + assert.Equal(t, "arg0", s.InitContainers[i].Args[0]) var sidecarEnvNames []string - for _, env := range s.Containers[i].Env { + for _, env := range s.InitContainers[i].Env { sidecarEnvNames = append(sidecarEnvNames, env.Name) } assert.Contains(t, sidecarEnvNames, EnvCPULimit) @@ -422,9 +424,10 @@ func TestGetPodSpec(t *testing.T) { } s, err := testObj.GetPodSpec(req) assert.NoError(t, err) - assert.Equal(t, 2, len(s.Containers)) + assert.Equal(t, 1, len(s.Containers)) + assert.Equal(t, 2, len(s.InitContainers)) assert.Equal(t, CtrMain, s.Containers[0].Name) - assert.Equal(t, CtrUdf, s.Containers[1].Name) + assert.Equal(t, CtrUdf, s.InitContainers[1].Name) assert.Equal(t, testFlowImage, s.Containers[0].Image) assert.Equal(t, corev1.PullIfNotPresent, s.Containers[0].ImagePullPolicy) var envNames []string @@ -440,10 +443,11 @@ func TestGetPodSpec(t *testing.T) { assert.Contains(t, envNames, EnvReplica) assert.Contains(t, s.Containers[0].Args, "processor") assert.Contains(t, s.Containers[0].Args, "--type="+string(VertexTypeMapUDF)) - assert.Equal(t, 1, len(s.InitContainers)) + assert.Equal(t, 2, len(s.InitContainers)) assert.Equal(t, CtrInit, s.InitContainers[0].Name) + assert.Equal(t, CtrUdf, s.InitContainers[1].Name) var sidecarEnvNames []string - for _, env := range s.Containers[1].Env { + for _, env := range s.InitContainers[1].Env { sidecarEnvNames = append(sidecarEnvNames, env.Name) } assert.Contains(t, sidecarEnvNames, EnvCPULimit) @@ -462,13 +466,22 @@ func TestGetPodSpec(t *testing.T) { } s, err := testObj.GetPodSpec(req) assert.NoError(t, err) - assert.Equal(t, 3, len(s.Containers)) + assert.Equal(t, 2, len(s.Containers)) assert.Equal(t, CtrMain, s.Containers[0].Name) - assert.Equal(t, CtrUdf, s.Containers[1].Name) - assert.Equal(t, CtrSideInputsWatcher, s.Containers[2].Name) - assert.Equal(t, 2, len(s.InitContainers)) + assert.Equal(t, CtrSideInputsWatcher, s.Containers[1].Name) + assert.Equal(t, 3, len(s.InitContainers)) assert.Equal(t, CtrInit, s.InitContainers[0].Name) assert.Equal(t, CtrInitSideInputs, s.InitContainers[1].Name) + assert.Equal(t, 1, len(s.InitContainers[1].VolumeMounts)) + assert.Equal(t, "var-run-side-inputs", s.InitContainers[1].VolumeMounts[0].Name) + assert.False(t, s.InitContainers[1].VolumeMounts[0].ReadOnly) + assert.Equal(t, CtrUdf, s.InitContainers[2].Name) + assert.Equal(t, 2, len(s.InitContainers[2].VolumeMounts)) + assert.Equal(t, "var-run-side-inputs", s.InitContainers[2].VolumeMounts[1].Name) + assert.True(t, s.InitContainers[2].VolumeMounts[1].ReadOnly) + assert.Equal(t, 1, len(s.Containers[1].VolumeMounts)) + assert.Equal(t, "var-run-side-inputs", s.Containers[1].VolumeMounts[0].Name) + assert.False(t, s.Containers[1].VolumeMounts[0].ReadOnly) }) t.Run("test serving source", func(t *testing.T) { diff --git a/pkg/reconciler/monovertex/controller_test.go b/pkg/reconciler/monovertex/controller_test.go index c9c5c003c..6cbc986c1 100644 --- a/pkg/reconciler/monovertex/controller_test.go +++ b/pkg/reconciler/monovertex/controller_test.go @@ -164,13 +164,13 @@ func Test_BuildPodSpec(t *testing.T) { testObj := testMonoVtx.DeepCopy() spec, err := r.buildPodSpec(testObj) assert.NoError(t, err) - assert.Equal(t, 5, len(spec.Containers)) + assert.Equal(t, 1, len(spec.Containers)) assert.Equal(t, dfv1.CtrMain, spec.Containers[0].Name) - assert.Equal(t, dfv1.CtrUdsource, spec.Containers[1].Name) - assert.Equal(t, dfv1.CtrUdtransformer, spec.Containers[2].Name) - assert.Equal(t, dfv1.CtrUdsink, spec.Containers[3].Name) - assert.Equal(t, dfv1.CtrFallbackUdsink, spec.Containers[4].Name) - assert.Equal(t, 0, len(spec.InitContainers)) + assert.Equal(t, 4, len(spec.InitContainers)) + assert.Equal(t, dfv1.CtrUdsource, spec.InitContainers[0].Name) + assert.Equal(t, dfv1.CtrUdtransformer, spec.InitContainers[1].Name) + assert.Equal(t, dfv1.CtrUdsink, spec.InitContainers[2].Name) + assert.Equal(t, dfv1.CtrFallbackUdsink, spec.InitContainers[3].Name) }) t.Run("test no transformer, no fallback sink", func(t *testing.T) { @@ -179,11 +179,11 @@ func Test_BuildPodSpec(t *testing.T) { testObj.Spec.Sink.Fallback = nil spec, err := r.buildPodSpec(testObj) assert.NoError(t, err) - assert.Equal(t, 3, len(spec.Containers)) + assert.Equal(t, 1, len(spec.Containers)) assert.Equal(t, dfv1.CtrMain, spec.Containers[0].Name) - assert.Equal(t, dfv1.CtrUdsource, spec.Containers[1].Name) - assert.Equal(t, dfv1.CtrUdsink, spec.Containers[2].Name) - assert.Equal(t, 0, len(spec.InitContainers)) + assert.Equal(t, 2, len(spec.InitContainers)) + assert.Equal(t, dfv1.CtrUdsource, spec.InitContainers[0].Name) + assert.Equal(t, dfv1.CtrUdsink, spec.InitContainers[1].Name) }) } diff --git a/pkg/reconciler/vertex/controller_test.go b/pkg/reconciler/vertex/controller_test.go index 4a2faa56b..51a2540c3 100644 --- a/pkg/reconciler/vertex/controller_test.go +++ b/pkg/reconciler/vertex/controller_test.go @@ -273,8 +273,8 @@ func Test_BuildPodSpec(t *testing.T) { } spec, err := r.buildPodSpec(testObj, testPipeline, fakeIsbSvcConfig, 2) assert.NoError(t, err) - assert.Equal(t, 1, len(spec.InitContainers)) - assert.Equal(t, 2, len(spec.Containers)) + assert.Equal(t, 2, len(spec.InitContainers)) + assert.Equal(t, 1, len(spec.Containers)) }) t.Run("test user-defined source with transformer", func(t *testing.T) { @@ -295,8 +295,8 @@ func Test_BuildPodSpec(t *testing.T) { } spec, err := r.buildPodSpec(testObj, testPipeline, fakeIsbSvcConfig, 2) assert.NoError(t, err) - assert.Equal(t, 1, len(spec.InitContainers)) - assert.Equal(t, 3, len(spec.Containers)) + assert.Equal(t, 3, len(spec.InitContainers)) + assert.Equal(t, 1, len(spec.Containers)) }) t.Run("test sink", func(t *testing.T) { @@ -351,13 +351,13 @@ func Test_BuildPodSpec(t *testing.T) { testObj.Spec.ToEdges = []dfv1.CombinedEdge{} spec, err := r.buildPodSpec(testObj, testPipeline, fakeIsbSvcConfig, 0) assert.NoError(t, err) - assert.Equal(t, 1, len(spec.InitContainers)) - assert.Equal(t, 2, len(spec.Containers)) - assert.Equal(t, "image", spec.Containers[1].Image) - assert.Equal(t, 1, len(spec.Containers[1].Command)) - assert.Equal(t, "cmd", spec.Containers[1].Command[0]) - assert.Equal(t, 1, len(spec.Containers[1].Args)) - assert.Equal(t, "arg0", spec.Containers[1].Args[0]) + assert.Equal(t, 2, len(spec.InitContainers)) + assert.Equal(t, 1, len(spec.Containers)) + assert.Equal(t, "image", spec.InitContainers[1].Image) + assert.Equal(t, 1, len(spec.InitContainers[1].Command)) + assert.Equal(t, "cmd", spec.InitContainers[1].Command[0]) + assert.Equal(t, 1, len(spec.InitContainers[1].Args)) + assert.Equal(t, "arg0", spec.InitContainers[1].Args[0]) }) t.Run("test map udf", func(t *testing.T) { @@ -371,8 +371,8 @@ func Test_BuildPodSpec(t *testing.T) { } spec, err := r.buildPodSpec(testObj, testPipeline, fakeIsbSvcConfig, 0) assert.NoError(t, err) - assert.Equal(t, 1, len(spec.InitContainers)) - assert.Equal(t, 2, len(spec.Containers)) + assert.Equal(t, 2, len(spec.InitContainers)) + assert.Equal(t, 1, len(spec.Containers)) var envNames []string for _, e := range spec.Containers[0].Env { envNames = append(envNames, e.Name) @@ -410,8 +410,8 @@ func Test_BuildPodSpec(t *testing.T) { } spec, err := r.buildPodSpec(testObj, testPipeline, fakeIsbSvcConfig, 2) assert.NoError(t, err) - assert.Equal(t, 1, len(spec.InitContainers)) - assert.Equal(t, 2, len(spec.Containers)) + assert.Equal(t, 2, len(spec.InitContainers)) + assert.Equal(t, 1, len(spec.Containers)) containsPVC := false containsPVCMount := false for _, v := range spec.Volumes { @@ -509,7 +509,8 @@ func Test_reconcile(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(pods.Items)) assert.True(t, strings.HasPrefix(pods.Items[0].Name, testVertexName+"-0-")) - assert.Equal(t, 2, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 1, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 2, len(pods.Items[0].Spec.InitContainers)) svcs := &corev1.ServiceList{} err = r.client.List(ctx, svcs, &client.ListOptions{Namespace: testNamespace, LabelSelector: selector}) assert.NoError(t, err) @@ -573,7 +574,8 @@ func Test_reconcile(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(pods.Items)) assert.True(t, strings.HasPrefix(pods.Items[0].Name, testVertexName+"-0-")) - assert.Equal(t, 2, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 1, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 2, len(pods.Items[0].Spec.InitContainers)) }) t.Run("test reconcile reduce udf", func(t *testing.T) { @@ -616,7 +618,8 @@ func Test_reconcile(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(pods.Items)) assert.True(t, strings.HasPrefix(pods.Items[0].Name, testVertexName+"-0-")) - assert.Equal(t, 2, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 1, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 2, len(pods.Items[0].Spec.InitContainers)) pvc := &corev1.PersistentVolumeClaim{} err = r.client.Get(ctx, types.NamespacedName{Name: dfv1.GeneratePBQStoragePVCName(testPl.Name, testObj.Spec.Name, 0), Namespace: testNamespace}, pvc) assert.NoError(t, err) @@ -715,8 +718,8 @@ func Test_reconcile(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(pods.Items)) assert.True(t, strings.HasPrefix(pods.Items[0].Name, testVertexName+"-0-")) - assert.Equal(t, 3, len(pods.Items[0].Spec.Containers)) - assert.Equal(t, 2, len(pods.Items[0].Spec.InitContainers)) + assert.Equal(t, 2, len(pods.Items[0].Spec.Containers)) + assert.Equal(t, 3, len(pods.Items[0].Spec.InitContainers)) }) t.Run("test reconcile rolling update", func(t *testing.T) {