Stream
is a stream processing library based on Go 1.18+ Generics. It supports parallel processing of data in the stream.
Parallel
: Parallel processing of data in the stream, keeping the original order of the elements in the streamPipeline
: combine multiple operations to reduce element loops, short-circuiting earlierLazy Invocation
: intermediate operations are lazy
Requires Go 1.18+ version installed
import "github.com/xyctruth/stream"
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
Filter(func(s string) bool { return s != "b" }).
Map(func(s string) string { return "class_" + s }).
Sort().
Distinct().
ToSlice()
any
accepts elements of any type, so you cannot use ==
!=
>
<
to compare elements, which will prevent you from using Sort(), Find()... functions, but you can use SortFunc(fn), FindFunc(fn)... instead
stream.NewSlice([]int{1, 2, 3, 7, 1})
comparable
accepts type can use ==
!=
to compare elements, but still can't use >
<
to compare elements, so you can't use Sort(), Min()... functions, but you can use SortFunc(fn), MinFunc()... instead
stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})
constraints.Ordered
accepts types that can use ==
!=
>
<
to compare elements, so can use all functions
stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})
Sometimes we need to use Map
, Reduce
to convert the type of slice elements, but unfortunately Golang currently does not support structure methods with additional type parameters, all type parameters must be declared in the structure. We work around this with a temporary workaround until Golang supports it.
// SliceMappingStream Need to convert the type of slice elements.
// - E elements type
// - MapE map elements type
// - ReduceE reduce elements type
type SliceMappingStream[E any, MapE any, ReduceE any] struct {
SliceStream[E]
}
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
Filter(func(v int) bool { return v >3 }).
Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
Reduce(func(r string, v string) string { return r + v })
The Parallel
function accept a goroutines int
parameter. If goroutines>1, open Parallel , otherwise close Parallel, the stream Parallel is off by default.
Parallel will divide the elements in the stream into multiple partitions equally, and create the same number of goroutine to execute, and it will ensure that the elements in the stream remain in the original order after processing is complete.
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
Parallel(10).
Filter(func(s string) bool {
// some time-consuming operations
return s != "b"
}).
Map(func(s string) string {
// some time-consuming operations
return "class_" + s
}).
ForEach(
func(index int, s string) {
// some time-consuming operations
},
).ToSlice()
First
: parallel processing ends as soon as the first return value is obtained.For: AllMatch, AnyMatch, FindFunc
ALL
: All elements need to be processed in parallel, all return values are obtained, and then the parallel is ended.For: Map, Filter
Action
: All elements need to be processed in parallel, no return value required.For: ForEach, Action
The number of parallel goroutines has different choices for CPU operations and IO operations. Generally, the number of goroutines does not need to be set larger than the number of CPU cores for CPU operations, while the number of goroutines for IO operations can be set to be much larger than the number of CPU cores.
NewSlice(s).Parallel(tt.goroutines).ForEach(func(i int, v int) {
sort.Ints(newArray(1000)) // Simulate time-consuming CPU operations
})
Benchmark with 6 cpu cores
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByCPU
goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_Parallel(0)-6 710 8265106 ns/op
BenchmarkParallelByCPU/goroutines(2)-6 1387 4333929 ns/op
BenchmarkParallelByCPU/goroutines(4)-6 2540 2361783 ns/op
BenchmarkParallelByCPU/goroutines(6)-6 3024 2100158 ns/op
BenchmarkParallelByCPU/goroutines(8)-6 2347 2531435 ns/op
BenchmarkParallelByCPU/goroutines(10)-6 2622 2306752 ns/op
NewSlice(s).Parallel(tt.goroutines).ForEach(func(i int, v int) {
time.Sleep(time.Millisecond) // Simulate time-consuming IO operations
})
Benchmark with 6 cpu cores
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByIO
goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6 52 102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6 100 55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6 214 27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6 315 18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6 411 14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6 537 11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6 2629 2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6 5094 1221887 ns/op