Skip to content

Commit

Permalink
fix: fix channel blocking; also status and value should not be change…
Browse files Browse the repository at this point in the history
…d after timeout
  • Loading branch information
RexSkz committed Jan 31, 2024
1 parent 93067e2 commit 6aaae91
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 24 deletions.
18 changes: 10 additions & 8 deletions allsettled.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@ type AllSettledValue struct {
}

type AllSettledResult struct {
finished chan bool
timeout chan bool
values []*AllSettledValue
finishedCh chan bool
timeoutCh chan bool
timeout bool
values []*AllSettledValue
}

func newAllSettledResult() *AllSettledResult {
result := &AllSettledResult{
finished: make(chan bool),
timeout: make(chan bool),
values: []*AllSettledValue{},
finishedCh: make(chan bool, 1),
timeoutCh: make(chan bool, 1),
timeout: false,
values: []*AllSettledValue{},
}
return result
}

func (r *AllSettledResult) Await() ([]*AllSettledValue, error) {
select {
case <-r.finished:
case <-r.finishedCh:
return r.values, nil
case <-r.timeout:
case <-r.timeoutCh:
return r.values, ErrorTimeout
}
}
25 changes: 18 additions & 7 deletions gromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (g *Gromise) AllSettled(fns []Executor) *AllSettledResult {

go func() {
if len(fns) == 0 {
result.finished <- true
result.finishedCh <- true
return
}

Expand Down Expand Up @@ -64,22 +64,33 @@ func (g *Gromise) AllSettled(fns []Executor) *AllSettledResult {
}()

if r, err := fn(); err != nil {
result.values[index].Status = StatusRejected
result.values[index].Reason = err
if !result.timeout {
result.values[index].Status = StatusRejected
result.values[index].Reason = err
}
} else {
result.values[index].Status = StatusFulfilled
result.values[index].Value = r
if !result.timeout {
result.values[index].Status = StatusFulfilled
result.values[index].Value = r
}
}
}(fn, index)
}

for {
if goroutinesToWait <= 0 {
result.finished <- true
result.finishedCh <- true
break
}
if time.Since(now).Milliseconds() > int64(g.timeoutMs) {
result.timeout <- true
result.timeoutCh <- true
result.timeout = true
for index := range result.values {
if result.values[index].Status == StatusPending {
result.values[index].Status = StatusRejected
result.values[index].Reason = ErrorTimeout
}
}
break
}
}
Expand Down
46 changes: 37 additions & 9 deletions gromise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,26 @@ func TestTimeout(t *testing.T) {
time.Sleep(1000 * time.Millisecond)
return 1, nil
},
func() (interface{}, error) {
return 2, nil
},
}

prev := time.Now()
_, err := New(100).AllSettled(fns).Await()
after := time.Now()

result, err := New(100).AllSettled(fns).Await()
if err != ErrorTimeout {
t.Errorf("err should be ErrorTimeout, got %v", err)
}

elapsedTime := after.Sub(prev)
// 150 ms is 100ms + 50ms epsilon
if elapsedTime > 150*time.Millisecond {
t.Errorf("fns should be executed concurrently, but used %d ms", elapsedTime.Milliseconds())
}
assert.Equal(t, result, []*AllSettledValue{
{
Status: StatusRejected,
Reason: ErrorTimeout,
},
{
Status: StatusFulfilled,
Value: 2,
},
})
}

func TestEmptyFns(t *testing.T) {
Expand All @@ -163,3 +168,26 @@ func TestEmptyFns(t *testing.T) {
t.Errorf("the length of results should be 0, got %d", len(results))
}
}

func TestTimeoutNotAffectResult(t *testing.T) {
fns := []Executor{
func() (interface{}, error) {
time.Sleep(1000 * time.Millisecond)
return 1, nil
},
}

result, _ := New(100).AllSettled(fns).Await()

// If fn is still running, the result status will be changed to
// StatusFulfilled after 1 second, which is not expected.
// The expected behaviour should be StatusReject.
time.Sleep(1000 * time.Millisecond)

assert.Equal(t, result, []*AllSettledValue{
{
Status: StatusRejected,
Reason: ErrorTimeout,
},
})
}

0 comments on commit 6aaae91

Please sign in to comment.