Skip to content

Commit

Permalink
Merge branch 'main' into mvtx-lookback
Browse files Browse the repository at this point in the history
  • Loading branch information
kohlisid authored Nov 20, 2024
2 parents 65d7717 + c5afc90 commit 30b1cc9
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 154 deletions.
10 changes: 9 additions & 1 deletion pkg/apis/numaflow/v1alpha1/container_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package v1alpha1

import corev1 "k8s.io/api/core/v1"
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
)

type containerBuilder corev1.Container

Expand Down Expand Up @@ -85,6 +88,11 @@ func (b containerBuilder) resources(x corev1.ResourceRequirements) containerBuil
return b
}

func (b containerBuilder) asSidecar() containerBuilder {
b.RestartPolicy = ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways)
return b
}

func (b containerBuilder) build() corev1.Container {
return corev1.Container(b)
}
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/container_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
resource "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/ptr"
)

var (
Expand Down Expand Up @@ -49,6 +50,7 @@ func Test_containerBuilder(t *testing.T) {
appendEnv(corev1.EnvVar{
Name: "env", Value: "value"}).
appendPorts(corev1.ContainerPort{Name: "port", ContainerPort: 8080}).
asSidecar().
build()
assert.Equal(t, "numa", c.Name)
assert.Len(t, c.VolumeMounts, 1)
Expand All @@ -58,4 +60,5 @@ func Test_containerBuilder(t *testing.T) {
assert.Equal(t, corev1.PullIfNotPresent, c.ImagePullPolicy)
assert.Equal(t, []corev1.EnvVar{{Name: "env", Value: "value"}}, c.Env)
assert.Equal(t, []corev1.ContainerPort{{Name: "port", ContainerPort: 8080}}, c.Ports)
assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy)
}
3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/container_supplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ type getContainerReq struct {
}

type containerSupplier interface {
getContainers(req getContainerReq) ([]corev1.Container, error)
// getContainers returns the sidecar containers and main containers for the vertex.
getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error)
}
34 changes: 19 additions & 15 deletions pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, e
}
volumeMounts := []corev1.VolumeMount{{Name: varVolumeName, MountPath: PathVarRun}}

containers := mv.Spec.buildContainers(getContainerReq{
sidecarContainers, containers := mv.Spec.buildContainers(getContainerReq{
env: envVars,
image: req.Image,
imagePullPolicy: req.PullPolicy,
Expand Down Expand Up @@ -391,17 +391,19 @@ func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, e
{Name: MonoVertexMetricsPortName, ContainerPort: MonoVertexMetricsPort},
}

if len(containers) > 1 { // udf, udsink, udsource, or source vertex specifies a udtransformer
for i := 1; i < len(containers); i++ {
containers[i].Env = append(containers[i].Env, mv.commonEnvs()...)
containers[i].Env = append(containers[i].Env, mv.sidecarEnvs()...)
}
for i := 0; i < len(sidecarContainers); i++ { // udsink, udsource, udtransformer ...
sidecarContainers[i].Env = append(sidecarContainers[i].Env, mv.commonEnvs()...)
sidecarContainers[i].Env = append(sidecarContainers[i].Env, mv.sidecarEnvs()...)
}

initContainers := []corev1.Container{}
initContainers = append(initContainers, mv.Spec.InitContainers...)
initContainers = append(initContainers, sidecarContainers...)

spec := &corev1.PodSpec{
Subdomain: mv.GetHeadlessServiceName(),
Volumes: append(volumes, mv.Spec.Volumes...),
InitContainers: mv.Spec.InitContainers,
InitContainers: initContainers,
Containers: append(containers, mv.Spec.Sidecars...),
}
mv.Spec.AbstractPodTemplate.ApplyToPodSpec(spec)
Expand Down Expand Up @@ -458,26 +460,28 @@ func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec {
return x
}

func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) []corev1.Container {
// buildContainers builds the sidecar containers and main containers for the mono vertex.
func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) ([]corev1.Container, []corev1.Container) {
mainContainer := containerBuilder{}.
init(req).command(NumaflowRustBinary).args("--rust").build()

containers := []corev1.Container{mainContainer}

sidecarContainers := []corev1.Container{}
if mvspec.Source.UDSource != nil { // Only support UDSource for now.
containers = append(containers, mvspec.Source.getUDSourceContainer(req))
sidecarContainers = append(sidecarContainers, mvspec.Source.getUDSourceContainer(req))
}
if mvspec.Source.UDTransformer != nil {
containers = append(containers, mvspec.Source.getUDTransformerContainer(req))
sidecarContainers = append(sidecarContainers, mvspec.Source.getUDTransformerContainer(req))
}
if mvspec.Sink.UDSink != nil { // Only support UDSink for now.
containers = append(containers, mvspec.Sink.getUDSinkContainer(req))
sidecarContainers = append(sidecarContainers, mvspec.Sink.getUDSinkContainer(req))
}
if mvspec.Sink.Fallback != nil {
containers = append(containers, mvspec.Sink.getFallbackUDSinkContainer(req))
sidecarContainers = append(sidecarContainers, mvspec.Sink.getFallbackUDSinkContainer(req))
}
// Fallback sink is not supported.
containers = append(containers, mvspec.Sidecars...)
return containers
sidecarContainers = append(sidecarContainers, mvspec.Sidecars...)
return sidecarContainers, containers
}

type MonoVertexLimits struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/apis/numaflow/v1alpha1/mono_vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,24 @@ func TestMonoVertexGetPodSpec(t *testing.T) {
}
podSpec, err := testMvtx.GetPodSpec(req)
assert.NoError(t, err)
assert.Equal(t, 4, len(podSpec.Containers))
assert.Equal(t, 1, len(podSpec.Containers))
assert.Equal(t, 3, len(podSpec.InitContainers))
assert.Equal(t, 1, len(podSpec.Volumes))
assert.Equal(t, "my-image", podSpec.Containers[0].Image)
assert.Equal(t, corev1.PullIfNotPresent, podSpec.Containers[0].ImagePullPolicy)
assert.Equal(t, "100m", podSpec.Containers[0].Resources.Requests.Cpu().String())
assert.Equal(t, "200m", podSpec.Containers[0].Resources.Limits.Cpu().String())
assert.Equal(t, "100Mi", podSpec.Containers[0].Resources.Requests.Memory().String())
assert.Equal(t, "200Mi", podSpec.Containers[0].Resources.Limits.Memory().String())
assert.Equal(t, "test-image1", podSpec.Containers[1].Image)
assert.Equal(t, "test-image2", podSpec.Containers[2].Image)
assert.Equal(t, "test-image3", podSpec.Containers[3].Image)
assert.Equal(t, "test-image1", podSpec.InitContainers[0].Image)
assert.Equal(t, "test-image2", podSpec.InitContainers[1].Image)
assert.Equal(t, "test-image3", podSpec.InitContainers[2].Image)
for _, c := range podSpec.Containers {
assert.Equal(t, 1, len(c.VolumeMounts))
}
for _, c := range podSpec.InitContainers {
assert.Equal(t, 1, len(c.VolumeMounts))
}
envNames := []string{}
for _, env := range podSpec.Containers[0].Env {
envNames = append(envNames, env.Name)
Expand Down
13 changes: 7 additions & 6 deletions pkg/apis/numaflow/v1alpha1/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ type AbstractSink struct {
UDSink *UDSink `json:"udsink,omitempty" protobuf:"bytes,4,opt,name=udsink"`
}

func (s Sink) getContainers(req getContainerReq) ([]corev1.Container, error) {
func (s Sink) getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) {
containers := []corev1.Container{
s.getMainContainer(req),
}
sidecarContainers := []corev1.Container{}
if s.UDSink != nil {
containers = append(containers, s.getUDSinkContainer(req))
sidecarContainers = append(sidecarContainers, s.getUDSinkContainer(req))
}
if s.Fallback != nil && s.Fallback.UDSink != nil {
containers = append(containers, s.getFallbackUDSinkContainer(req))
sidecarContainers = append(sidecarContainers, s.getFallbackUDSinkContainer(req))
}
return containers, nil
return sidecarContainers, containers, nil
}

func (s Sink) getMainContainer(req getContainerReq) corev1.Container {
Expand All @@ -72,7 +73,7 @@ func (s Sink) getUDSinkContainer(mainContainerReq getContainerReq) corev1.Contai
c := containerBuilder{}.
name(CtrUdsink).
imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container
appendVolumeMounts(mainContainerReq.volumeMounts...)
appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar()
x := s.UDSink.Container
c = c.image(x.Image)
if len(x.Command) > 0 {
Expand Down Expand Up @@ -107,7 +108,7 @@ func (s Sink) getFallbackUDSinkContainer(mainContainerReq getContainerReq) corev
c := containerBuilder{}.
name(CtrFallbackUdsink).
imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container
appendVolumeMounts(mainContainerReq.volumeMounts...)
appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar()
x := s.Fallback.UDSink.Container
c = c.image(x.Image)
if len(x.Command) > 0 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/numaflow/v1alpha1/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func Test_Sink_getContainers(t *testing.T) {
s := Sink{}
c, err := s.getContainers(getContainerReq{
sc, c, err := s.getContainers(getContainerReq{
env: []corev1.EnvVar{
{Name: "test-env", Value: "test-val"},
},
Expand All @@ -37,6 +37,7 @@ func Test_Sink_getContainers(t *testing.T) {
resources: corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}},
})
assert.NoError(t, err)
assert.Equal(t, 0, len(sc))
assert.Equal(t, 1, len(c))
assert.Equal(t, testFlowImage, c[0].Image)
assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[0].Resources)
Expand Down Expand Up @@ -89,6 +90,7 @@ func Test_Sink_getUDSinkContainer(t *testing.T) {
assert.Equal(t, int32(15), c.LivenessProbe.TimeoutSeconds)
assert.Equal(t, int32(14), c.LivenessProbe.PeriodSeconds)
assert.Equal(t, int32(5), c.LivenessProbe.FailureThreshold)
assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy)
}

func Test_Sink_getFallbackUDSinkContainer(t *testing.T) {
Expand Down Expand Up @@ -151,4 +153,5 @@ func Test_Sink_getFallbackUDSinkContainer(t *testing.T) {
})
assert.Equal(t, testImagePullPolicy, c.ImagePullPolicy)
assert.True(t, c.LivenessProbe != nil)
assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy)
}
13 changes: 7 additions & 6 deletions pkg/apis/numaflow/v1alpha1/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ type Source struct {
Serving *ServingSource `json:"serving,omitempty" protobuf:"bytes,8,opt,name=serving"`
}

func (s Source) getContainers(req getContainerReq) ([]corev1.Container, error) {
func (s Source) getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) {
containers := []corev1.Container{
s.getMainContainer(req),
}
sidecarContainers := []corev1.Container{}
if s.UDTransformer != nil {
containers = append(containers, s.getUDTransformerContainer(req))
sidecarContainers = append(sidecarContainers, s.getUDTransformerContainer(req))
}
if s.UDSource != nil {
containers = append(containers, s.getUDSourceContainer(req))
sidecarContainers = append(sidecarContainers, s.getUDSourceContainer(req))
}
return containers, nil
return sidecarContainers, containers, nil
}

func (s Source) getMainContainer(req getContainerReq) corev1.Container {
Expand All @@ -69,7 +70,7 @@ func (s Source) getUDTransformerContainer(mainContainerReq getContainerReq) core
c := containerBuilder{}.
name(CtrUdtransformer).
imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container
appendVolumeMounts(mainContainerReq.volumeMounts...)
appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar()
c = c.appendEnv(corev1.EnvVar{Name: EnvUDContainerType, Value: UDContainerTransformer})
if x := s.UDTransformer.Container; x != nil && x.Image != "" { // customized image
c = c.image(x.Image)
Expand Down Expand Up @@ -133,7 +134,7 @@ func (s Source) getUDSourceContainer(mainContainerReq getContainerReq) corev1.Co
c := containerBuilder{}.
name(CtrUdsource).
imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as the main container
appendVolumeMounts(mainContainerReq.volumeMounts...)
appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar()
c = c.appendEnv(corev1.EnvVar{Name: EnvUDContainerType, Value: UDContainerSource})
if x := s.UDSource.Container; x != nil && x.Image != "" { // customized image
c = c.image(x.Image)
Expand Down
68 changes: 36 additions & 32 deletions pkg/apis/numaflow/v1alpha1/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,60 +77,64 @@ func TestSource_getContainers(t *testing.T) {
},
},
}
c, err := x.getContainers(getContainerReq{
sc, c, err := x.getContainers(getContainerReq{
image: "main-image",
})
assert.NoError(t, err)
assert.Equal(t, 3, len(c))
assert.Equal(t, 2, len(sc))
assert.Equal(t, 1, len(c))
assert.Equal(t, "main-image", c[0].Image)

assert.Equal(t, x.UDSource.Container.Image, c[2].Image)
assert.Contains(t, c[2].VolumeMounts, c[2].VolumeMounts[0])
assert.Equal(t, x.UDSource.Container.Command, c[2].Command)
assert.Equal(t, x.UDSource.Container.Args, c[2].Args)
assert.Equal(t, x.UDSource.Container.Image, sc[1].Image)
assert.Contains(t, sc[1].VolumeMounts, sc[1].VolumeMounts[0])
assert.Equal(t, x.UDSource.Container.Command, sc[1].Command)
assert.Equal(t, x.UDSource.Container.Args, sc[1].Args)
envsUDSource := map[string]string{}
for _, e := range c[2].Env {
for _, e := range sc[1].Env {
envsUDSource[e.Name] = e.Value
}
assert.Equal(t, envsUDSource[EnvUDContainerType], UDContainerSource)
assert.Equal(t, x.UDSource.Container.EnvFrom, c[2].EnvFrom)
assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[2].Resources)
assert.Equal(t, c[0].ImagePullPolicy, c[2].ImagePullPolicy)
assert.NotNil(t, c[1].LivenessProbe)
assert.Equal(t, int32(10), c[2].LivenessProbe.InitialDelaySeconds)
assert.Equal(t, int32(15), c[2].LivenessProbe.TimeoutSeconds)
assert.Equal(t, int32(14), c[2].LivenessProbe.PeriodSeconds)
assert.Equal(t, int32(5), c[2].LivenessProbe.FailureThreshold)
assert.Equal(t, x.UDSource.Container.EnvFrom, sc[1].EnvFrom)
assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, sc[1].Resources)
assert.Equal(t, c[0].ImagePullPolicy, sc[1].ImagePullPolicy)
assert.NotNil(t, sc[0].LivenessProbe)
assert.Equal(t, int32(10), sc[1].LivenessProbe.InitialDelaySeconds)
assert.Equal(t, int32(15), sc[1].LivenessProbe.TimeoutSeconds)
assert.Equal(t, int32(14), sc[1].LivenessProbe.PeriodSeconds)
assert.Equal(t, int32(5), sc[1].LivenessProbe.FailureThreshold)
x.UDSource.Container.ImagePullPolicy = &testImagePullPolicy
c, _ = x.getContainers(getContainerReq{
assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), sc[0].RestartPolicy)
assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), sc[1].RestartPolicy)
sc, c, _ = x.getContainers(getContainerReq{
image: "main-image",
imagePullPolicy: corev1.PullAlways,
})
assert.Equal(t, testImagePullPolicy, c[2].ImagePullPolicy)
assert.Equal(t, testImagePullPolicy, sc[1].ImagePullPolicy)

assert.Equal(t, x.UDTransformer.Container.Image, c[1].Image)
assert.Contains(t, c[1].VolumeMounts, c[1].VolumeMounts[0])
assert.Equal(t, x.UDTransformer.Container.Command, c[1].Command)
assert.Equal(t, x.UDTransformer.Container.Args, c[1].Args)
assert.Equal(t, x.UDTransformer.Container.Image, sc[0].Image)
assert.Contains(t, sc[0].VolumeMounts, sc[0].VolumeMounts[0])
assert.Equal(t, x.UDTransformer.Container.Command, sc[0].Command)
assert.Equal(t, x.UDTransformer.Container.Args, sc[0].Args)
envs := map[string]string{}
for _, e := range c[1].Env {
for _, e := range sc[0].Env {
envs[e.Name] = e.Value
}
assert.Equal(t, envs[EnvUDContainerType], UDContainerTransformer)
assert.Equal(t, x.UDTransformer.Container.EnvFrom, c[1].EnvFrom)
assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, c[1].Resources)
assert.Equal(t, c[0].ImagePullPolicy, c[1].ImagePullPolicy)
assert.NotNil(t, c[1].LivenessProbe)
assert.Equal(t, int32(20), c[1].LivenessProbe.InitialDelaySeconds)
assert.Equal(t, int32(25), c[1].LivenessProbe.TimeoutSeconds)
assert.Equal(t, int32(24), c[1].LivenessProbe.PeriodSeconds)
assert.Equal(t, int32(5), c[1].LivenessProbe.FailureThreshold)
assert.Equal(t, x.UDTransformer.Container.EnvFrom, sc[0].EnvFrom)
assert.Equal(t, corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{"cpu": resource.MustParse("2")}}, sc[0].Resources)
assert.Equal(t, c[0].ImagePullPolicy, sc[0].ImagePullPolicy)
assert.NotNil(t, sc[0].LivenessProbe)
assert.Equal(t, int32(20), sc[0].LivenessProbe.InitialDelaySeconds)
assert.Equal(t, int32(25), sc[0].LivenessProbe.TimeoutSeconds)
assert.Equal(t, int32(24), sc[0].LivenessProbe.PeriodSeconds)
assert.Equal(t, int32(5), sc[0].LivenessProbe.FailureThreshold)
x.UDTransformer.Container.ImagePullPolicy = &testImagePullPolicy
c, _ = x.getContainers(getContainerReq{
sc, c, _ = x.getContainers(getContainerReq{
image: "main-image",
imagePullPolicy: corev1.PullAlways,
})
assert.Equal(t, testImagePullPolicy, c[1].ImagePullPolicy)
assert.Equal(t, corev1.PullAlways, c[0].ImagePullPolicy)
assert.Equal(t, testImagePullPolicy, sc[0].ImagePullPolicy)
}

func Test_getTransformerContainer(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/numaflow/v1alpha1/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type UDF struct {
GroupBy *GroupBy `json:"groupBy" protobuf:"bytes,3,opt,name=groupBy"`
}

func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, error) {
return []corev1.Container{in.getMainContainer(req), in.getUDFContainer(req)}, nil
func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, []corev1.Container, error) {
return []corev1.Container{in.getUDFContainer(req)}, []corev1.Container{in.getMainContainer(req)}, nil
}

func (in UDF) getMainContainer(req getContainerReq) corev1.Container {
Expand All @@ -63,7 +63,7 @@ func (in UDF) getUDFContainer(mainContainerReq getContainerReq) corev1.Container
c := containerBuilder{}.
name(CtrUdf).
imagePullPolicy(mainContainerReq.imagePullPolicy). // Use the same image pull policy as main container
appendVolumeMounts(mainContainerReq.volumeMounts...)
appendVolumeMounts(mainContainerReq.volumeMounts...).asSidecar()
if x := in.Container; x != nil && x.Image != "" { // customized image
c = c.image(x.Image)
if len(x.Command) > 0 {
Expand Down
Loading

0 comments on commit 30b1cc9

Please sign in to comment.