Skip to content

Commit

Permalink
fix: parse time should consider loc (#2169)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored Aug 11, 2023
1 parent f4ca8ab commit e582ab2
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 30 deletions.
1 change: 1 addition & 0 deletions etc/kuiper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ basic:
# maxConnections indicates the max connections for the certain database instance group by driver and dsn sharing between the sources/sinks
# 0 indicates unlimited
maxConnections: 0
rulePatrolInterval: 10s

# The default options for all rules. Each rule can override this setting by defining its own option
rule:
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/rule/ruleState.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (rs *RuleState) GetState() (string, error) {
case nil:
result = "Running"
case context.Canceled:
if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
if (rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule) || rs.Rule.IsLongRunningScheduleRule() {
if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
result = "Stopped: schedule terminated."
} else {
Expand All @@ -448,7 +448,7 @@ func (rs *RuleState) GetState() (string, error) {
result = fmt.Sprintf("Stopped: %v.", err)
}
} else {
if rs.cronState.isInSchedule {
if rs.cronState.isInSchedule || rs.Rule.IsLongRunningScheduleRule() {
if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
result = "Stopped: schedule terminated."
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/rule/ruleState_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func TestStartLongRunningScheduleRule(t *testing.T) {
End: after.Format(layout),
},
}
const ruleStopped = "Stopped: canceled manually."
const ruleStopped = "Stopped: schedule terminated."
const ruleStarted = "Running"
func() {
rs, err := NewRuleState(r)
Expand Down
9 changes: 7 additions & 2 deletions pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/robfig/cron/v3"

"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
)

const layout = "2006-01-02 15:04:05"
Expand All @@ -38,11 +39,15 @@ func IsInScheduleRanges(now time.Time, timeRanges []api.DatetimeRange) (bool, er
}

func IsInScheduleRange(now time.Time, start string, end string) (bool, error) {
s, err := time.Parse(layout, start)
return isInTimeRange(now, start, end)
}

func isInTimeRange(now time.Time, start string, end string) (bool, error) {
s, err := cast.InterfaceToTime(start, layout)
if err != nil {
return false, err
}
e, err := time.Parse(layout, end)
e, err := cast.InterfaceToTime(end, layout)
if err != nil {
return false, err
}
Expand Down
31 changes: 6 additions & 25 deletions pkg/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/pkg/api"
)

func TestIsInScheduleRanges(t *testing.T) {
func TestIsInTimeRange(t *testing.T) {
now, err := time.Parse(layout, "2006-01-02 15:04:01")
require.NoError(t, err)
rs := []api.DatetimeRange{
{
Begin: "2006-01-02 15:04:00",
End: "2006-01-02 15:04:03",
},
}
isIn, err := IsInScheduleRanges(now, rs)
isIn, err := isInTimeRange(now, "2006-01-02 15:04:00", "2006-01-02 15:04:03")
require.NoError(t, err)
require.True(t, isIn)
_, err = isInTimeRange(now, "123", "2006-01-02 15:04:03")
require.Error(t, err)
_, err = isInTimeRange(now, "2006-01-02 15:04:00", "13")
require.Error(t, err)
}

func TestIsRuleInRunningSchedule(t *testing.T) {
Expand All @@ -48,21 +44,6 @@ func TestIsRuleInRunningSchedule(t *testing.T) {
require.Equal(t, remainedDuration, time.Second)
}

func TestIsInScheduleRange(t *testing.T) {
now, err := time.Parse(layout, "2006-01-02 15:04:01")
require.NoError(t, err)
_, err = IsInScheduleRange(now, "", "")
require.Error(t, err)
_, err = IsInScheduleRange(now, "2006-01-02 15:04:01", "")
require.Error(t, err)
isIn, err := IsInScheduleRange(now, "2006-01-02 15:04:00", "2006-01-02 15:04:03")
require.NoError(t, err)
require.True(t, isIn)
isIn, err = IsInScheduleRange(now, "2006-01-02 15:05:00", "2006-01-02 15:05:03")
require.NoError(t, err)
require.False(t, isIn)
}

func TestIsAfterTimeRange(t *testing.T) {
now, err := time.Parse(layout, "2006-01-02 15:04:01")
require.NoError(t, err)
Expand Down

0 comments on commit e582ab2

Please sign in to comment.