diff --git a/streaming-operator/union/union.go b/streaming-operator/union/union.go index e739898..b806603 100644 --- a/streaming-operator/union/union.go +++ b/streaming-operator/union/union.go @@ -39,15 +39,20 @@ func (o *operator[T]) NotifyCheckpointCancel(checkpointId int64) { } func (o *operator[T]) Open(ctx Context, emit element.Emit) error { - if err := o.rich.Open(ctx); err != nil { - return err + if o.rich != nil { + if err := o.rich.Open(ctx); err != nil { + return err + } } o.combineWatermark = NewCombineWatermark(o.inputCount) return nil } func (o *operator[T]) Close() error { - return o.rich.Close() + if o.rich != nil { + return o.rich.Close() + } + return nil } func (o *operator[T]) ProcessElement(e element.Element, index int) { diff --git a/streaming-operator/union/union_test.go b/streaming-operator/union/union_test.go deleted file mode 100644 index 5671fb3..0000000 --- a/streaming-operator/union/union_test.go +++ /dev/null @@ -1 +0,0 @@ -package union