Skip to content

Commit

Permalink
Merge pull request #11 from RuiFG/master
Browse files Browse the repository at this point in the history
fix: union operator rich can't be nil
  • Loading branch information
RuiFG authored Jan 9, 2023
2 parents 21e26f0 + 5996c4f commit e70c5b2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
11 changes: 8 additions & 3 deletions streaming-operator/union/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ func (o *operator[T]) NotifyCheckpointCancel(checkpointId int64) {
}

func (o *operator[T]) Open(ctx Context, emit element.Emit) error {
if err := o.rich.Open(ctx); err != nil {
return err
if o.rich != nil {
if err := o.rich.Open(ctx); err != nil {
return err
}
}
o.combineWatermark = NewCombineWatermark(o.inputCount)
return nil
}

func (o *operator[T]) Close() error {
return o.rich.Close()
if o.rich != nil {
return o.rich.Close()
}
return nil
}

func (o *operator[T]) ProcessElement(e element.Element, index int) {
Expand Down
1 change: 0 additions & 1 deletion streaming-operator/union/union_test.go

This file was deleted.

0 comments on commit e70c5b2

Please sign in to comment.