Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Session Window and Reduce Streaming #1384

Merged
merged 58 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0f6581b
async reduce
yhl25 Oct 25, 2023
7a55791
revert
yhl25 Oct 25, 2023
29b72f6
reduce streaming
yhl25 Oct 27, 2023
a361522
remove unwanted lock
yhl25 Oct 30, 2023
27018eb
doc
yhl25 Oct 30, 2023
8237bf7
Merge branch 'main' of github.com:numaproj/numaflow into unaligned-wi…
yhl25 Nov 2, 2023
60908cf
session window
yhl25 Nov 8, 2023
0f63625
update sdk
yhl25 Nov 8, 2023
bb24e50
sdk clients
yhl25 Nov 13, 2023
51011c9
session window
yhl25 Nov 13, 2023
e0edeb9
more tests
yhl25 Nov 14, 2023
6d6f999
fix tests
yhl25 Nov 15, 2023
b5af4d3
cleanup
yhl25 Nov 16, 2023
815b798
update sdk
yhl25 Nov 17, 2023
4c55129
working version
yhl25 Nov 21, 2023
e0d46eb
minor cleanup
yhl25 Nov 21, 2023
b07ab89
controller changes
yhl25 Nov 21, 2023
a4354bf
Merge branch 'main' of github.com:numaproj/numaflow into session-win
yhl25 Nov 21, 2023
1b9af00
fix wm
yhl25 Nov 21, 2023
985bb3a
comments
yhl25 Nov 23, 2023
35602f6
support partial responses
yhl25 Nov 23, 2023
ad08e57
working version with support for partial responses
yhl25 Nov 23, 2023
2bd2a2e
resolve conflicts
yhl25 Nov 23, 2023
202323b
chore: code review
vigith Nov 23, 2023
1b705ab
lint and codegen
yhl25 Nov 23, 2023
189daca
chore: code review
vigith Nov 24, 2023
22a20f6
remove sync reduce
yhl25 Nov 25, 2023
07ace25
fix naming and cleanup
yhl25 Nov 27, 2023
3a0f7f6
codegen
yhl25 Nov 27, 2023
35a6548
chore: code review
vigith Nov 28, 2023
96095b7
publish wm when window is closed
yhl25 Nov 28, 2023
7934a1c
Merge branch 'main' into session-win
vigith Nov 28, 2023
010be6a
chore: code review
vigith Nov 28, 2023
fce12d3
refactor pnf
yhl25 Nov 28, 2023
73ec91a
chore: code review
vigith Nov 29, 2023
1f84b81
session e2e
yhl25 Nov 29, 2023
bece6b2
chore: code review
vigith Nov 30, 2023
bd6acd8
Merge branch 'session-win' of github.com:numaproj/numaflow into sessi…
yhl25 Nov 30, 2023
69bbbbf
refactor
yhl25 Nov 30, 2023
7bc09d7
lint
yhl25 Nov 30, 2023
56633bf
chore: code review
vigith Dec 2, 2023
300f86f
refactor applier interface
yhl25 Dec 4, 2023
d896d19
Merge branch 'main' of github.com:numaproj/numaflow into session-win
yhl25 Dec 4, 2023
5daa614
update e2e images
yhl25 Dec 4, 2023
74acf43
avoiding breaking change, by introducing reducestreamer
yhl25 Dec 6, 2023
1375953
Merge branch 'main' of github.com:numaproj/numaflow into session-win
yhl25 Dec 7, 2023
c0f0055
remove global window, cleanup
yhl25 Dec 8, 2023
091f3e6
Merge branch 'main' of github.com:numaproj/numaflow into session-win
yhl25 Dec 8, 2023
da3d38c
lint and e2e
yhl25 Dec 8, 2023
26b8eee
fix e2e
yhl25 Dec 8, 2023
19408f9
tweak idle source config
yhl25 Dec 8, 2023
46cf75c
update sdk image
yhl25 Dec 8, 2023
44f76a5
Merge branch 'main' of github.com:numaproj/numaflow into session-win
yhl25 Dec 12, 2023
6e78ed6
review comments
yhl25 Dec 13, 2023
5619cea
update OldestWindowEndTime()
yhl25 Dec 13, 2023
0bc54c2
update sdk version
yhl25 Dec 13, 2023
adce7ed
codegen
yhl25 Dec 13, 2023
8686c99
Merge branch 'main' of github.com:numaproj/numaflow into session-win
yhl25 Dec 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17731,7 +17731,12 @@
"description": "FixedWindow describes a fixed window",
"properties": {
"length": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Length is the duration of the fixed window."
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
},
"type": "object"
Expand Down Expand Up @@ -19094,6 +19099,16 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SessionWindow": {
"description": "SessionWindow describes a session window",
"properties": {
"timeout": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Timeout is the duration of inactivity after which a session window closes."
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SideInput": {
"description": "SideInput defines information of a Side Input",
"properties": {
Expand Down Expand Up @@ -19235,10 +19250,16 @@
"description": "SlidingWindow describes a sliding window",
"properties": {
"length": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Length is the duration of the sliding window."
},
"slide": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Slide is the slide parameter that controls the frequency at which the sliding window is created."
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
},
"type": "object"
Expand Down Expand Up @@ -19803,6 +19824,9 @@
"fixed": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.FixedWindow"
},
"session": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SessionWindow"
},
"sliding": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SlidingWindow"
}
Expand Down
24 changes: 24 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -17736,7 +17736,12 @@
"type": "object",
"properties": {
"length": {
"description": "Length is the duration of the fixed window.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -19080,6 +19085,16 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.SessionWindow": {
"description": "SessionWindow describes a session window",
"type": "object",
"properties": {
"timeout": {
"description": "Timeout is the duration of inactivity after which a session window closes.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
}
}
},
"io.numaproj.numaflow.v1alpha1.SideInput": {
"description": "SideInput defines information of a Side Input",
"type": "object",
Expand Down Expand Up @@ -19222,10 +19237,16 @@
"type": "object",
"properties": {
"length": {
"description": "Length is the duration of the sliding window.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"slide": {
"description": "Slide is the slide parameter that controls the frequency at which the sliding window is created.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -19781,6 +19802,9 @@
"fixed": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.FixedWindow"
},
"session": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SessionWindow"
},
"sliding": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SlidingWindow"
}
Expand Down
9 changes: 9 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7938,13 +7938,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
9 changes: 9 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3853,13 +3853,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
18 changes: 18 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10467,13 +10467,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down Expand Up @@ -15134,13 +15143,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
18 changes: 18 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10467,13 +10467,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down Expand Up @@ -15134,13 +15143,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
81 changes: 81 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,20 @@ Description
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Length is the duration of the fixed window.
</p>
</td>
</tr>
<tr>
<td>
<code>streaming</code></br> <em> bool </em>
</td>
<td>
<em>(Optional)</em>
<p>
Streaming should be set to true if the reduce udf is streaming.
</p>
</td>
</tr>
</tbody>
Expand Down Expand Up @@ -4146,6 +4160,45 @@ CooldownSeconds if not set.
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.SessionWindow">
SessionWindow
</h3>
<p>
(<em>Appears on:</em>
<a href="#numaflow.numaproj.io/v1alpha1.Window">Window</a>)
</p>
<p>
<p>
SessionWindow describes a session window
</p>
</p>
<table>
<thead>
<tr>
<th>
Field
</th>
<th>
Description
</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>timeout</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Timeout is the duration of inactivity after which a session window
closes.
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.SideInput">
SideInput
</h3>
Expand Down Expand Up @@ -4398,6 +4451,9 @@ Description
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Length is the duration of the sliding window.
</p>
</td>
</tr>
<tr>
Expand All @@ -4407,6 +4463,21 @@ Kubernetes meta/v1.Duration </a> </em>
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Slide is the slide parameter that controls the frequency at which the
sliding window is created.
</p>
</td>
</tr>
<tr>
<td>
<code>streaming</code></br> <em> bool </em>
</td>
<td>
<em>(Optional)</em>
<p>
Streaming should be set to true if the reduce udf is streaming.
</p>
</td>
</tr>
</tbody>
Expand Down Expand Up @@ -5523,6 +5594,16 @@ Description
<em>(Optional)</em>
</td>
</tr>
<tr>
<td>
<code>session</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.SessionWindow"> SessionWindow
</a> </em>
</td>
<td>
<em>(Optional)</em>
</td>
</tr>
</tbody>
</table>
<hr/>
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9
github.com/numaproj/numaflow-go v0.5.3-0.20231213060340-dbd9016bbcb6
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand All @@ -40,7 +40,7 @@ require (
github.com/spf13/cobra v1.6.0
github.com/spf13/viper v1.9.0
github.com/stretchr/testify v1.8.4
go.uber.org/atomic v1.9.0
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.24.0
Expand Down
Loading
Loading