Skip to content

Commit

Permalink
Add methods (#2)
Browse files Browse the repository at this point in the history
* stream add methods:
OfSlice, OfMap

* feat(stream): add methods

Stream interface: ReduceBy
factory methods: OfInts, OfInt64s,
OfFloat32s, OfFloat64s, OfStrings

* docs(readme): update readme

add changelog and todo;
  • Loading branch information
youthlin authored Dec 8, 2020
1 parent 1751187 commit 569337c
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 27 deletions.
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Go Stream

[![PkgGoDev](https://pkg.go.dev/badge/github.com/youthlin/stream)](https://pkg.go.dev/github.com/youthlin/stream)
[![Go Report Card](https://goreportcard.com/badge/github.com/youthlin/stream)](https://goreportcard.com/report/github.com/youthlin/stream)
[![Build Status](https://travis-ci.org/youthlin/stream.svg?branch=main)](https://travis-ci.org/youthlin/stream)
Expand All @@ -9,13 +10,15 @@ Go Stream, like Java 8 Stream.
Blog Post: https://youthlin.com/?p=1755

## How to get

```shell script
go get github.com/youthlin/stream
```

国内镜像: https://gitee.com/youthlin/stream
`go.mod` 中引入模块路径 `github.com/youthlin/stream` 及版本后,
再添加 replace 即可:

```go
// go.mod

Expand All @@ -26,7 +29,9 @@ replace github.com/youthlin/stream latest => gitee.com/youthlin/stream latest
```

## Play online
https://play.golang.org/p/vO8NEkdNXzY

https://play.golang.org/p/nPQJYqA3-Jr

```go
package main

Expand All @@ -45,16 +50,19 @@ func main() {
Map(func(e types.T) types.R {
return e.(int) * 2
}).
ReduceWith(map[int]string{}, func(m types.R, e types.T) types.R {
m.(map[int]string)[e.(int)] = fmt.Sprintf("<%d>", e)
ReduceWith(map[int]string{}, func(acc types.R, e types.T) types.R {
m := acc.(map[int]string)
m[e.(int)] = fmt.Sprintf("<%d>", e)
return m
})
fmt.Println(m)
// Output:
// map[0:<0> 4:<4> 8:<8> 12:<12> 16:<16>]
}

```

## Examples

```go

type Stream interface {
Expand Down Expand Up @@ -218,3 +226,15 @@ func TestToMap(t *testing.T) {
}

```

## Change Log

- v0.0.3 2020-12-08 add factory method: OfInts, OfInt64s, OfFloat32s, OfFloat64s, OfStrings;
add Stream method: ReduceBy
- v0.0.2 2020-12-07 add factory method: OfSlice, OfMap
- v0.0.1 2020-11-12 first version

## Todo

- [ ] add Benchmark test
- [ ] support parallel stream
74 changes: 74 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,47 @@ func ExampleOf() {
// 1,2,3,4,
}

func ExampleOfInts() {
var ints = []int{1, 2, 3, 4}
stream.OfInts(ints...).ForEach(func(e types.T) {
fmt.Printf("%d,", e)
})
// Output:
// 1,2,3,4,
}
func ExampleOfInt64s() {
var ints = []int64{1, 2, 3, 4}
stream.OfInt64s(ints...).ForEach(func(e types.T) {
fmt.Printf("%d(%T),", e, e)
})
// Output:
// 1(int64),2(int64),3(int64),4(int64),
}
func ExampleOfFloat32s() {
var ints = []float32{1, 2, 3, 4}
stream.OfFloat32s(ints...).ForEach(func(e types.T) {
fmt.Printf("%v(%T),", e, e)
})
// Output:
// 1(float32),2(float32),3(float32),4(float32),
}
func ExampleOfFloat64s() {
var ints = []float64{1, 2, 3, 4}
stream.OfFloat64s(ints...).ForEach(func(e types.T) {
fmt.Printf("%v(%T),", e, e)
})
// Output:
// 1(float64),2(float64),3(float64),4(float64),
}
func ExampleOfStrings() {
var ints = []string{"a", "b", "c"}
stream.OfStrings(ints...).ForEach(func(e types.T) {
fmt.Printf("%v(%T),", e, e)
})
// Output:
// a(string),b(string),c(string),
}

func ExampleOfSlice() {
var intArr = []int{1, 2, 3, 4}
stream.OfSlice(intArr).ForEach(func(e types.T) {
Expand Down Expand Up @@ -386,6 +427,7 @@ func ExampleStream_AnyMatch() {
// true true
// false
}

func ExampleStream_Reduce() {
fmt.Println(stream.Of().Reduce(func(acc types.T, t types.T) types.T {
return acc
Expand Down Expand Up @@ -414,6 +456,38 @@ func ExampleStream_ReduceWith() {
// Output:
// []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
}
func ExampleStream_ReduceBy() {
ints := stream.IntRange(0, 10).ReduceBy(func(sizeMayNegative int64) types.R {
if sizeMayNegative >= 0 {
return make([]int, 0, sizeMayNegative)
}
fmt.Printf("IntRange: unknown size\n")
return make([]int, 0)
}, func(acc types.R, e types.T) types.R {
result := acc.([]int)
result = append(result, e.(int))
return result
}).([]int)
fmt.Printf("%v\n", ints)
int64s := stream.OfInts(ints...).ReduceBy(func(sizeMayNegative int64) types.R {
if sizeMayNegative >= 0 {
fmt.Printf("size=%d\n", sizeMayNegative)
return make([]int64, 0, sizeMayNegative)
}
return make([]int64, 0)
}, func(acc types.R, e types.T) types.R {
result := acc.([]int64)
result = append(result, int64(e.(int)))
return result
}).([]int64)
fmt.Printf("%v\n", int64s)
// Output:
// IntRange: unknown size
// [0 1 2 3 4 5 6 7 8 9]
// size=10
// [0 1 2 3 4 5 6 7 8 9]
}

func ExampleStream_Count() {
fmt.Println(stream.Of().Count())
fmt.Println(stream.Of(1).Count())
Expand Down
46 changes: 46 additions & 0 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,52 @@ func Of(elements ...types.T) Stream {
return newHead(it(elements...))
}

func OfInts(element ...int) Stream {
return newHead(&intsIt{
base: &base{
current: 0,
size: len(element),
},
elements: element,
})
}
func OfInt64s(element ...int64) Stream {
return newHead(&int64sIt{
base: &base{
current: 0,
size: len(element),
},
elements: element,
})
}
func OfFloat32s(element ...float32) Stream {
return newHead(&float32sIt{
base: &base{
current: 0,
size: len(element),
},
elements: element,
})
}
func OfFloat64s(element ...float64) Stream {
return newHead(&float64sIt{
base: &base{
current: 0,
size: len(element),
},
elements: element,
})
}
func OfStrings(element ...string) Stream {
return newHead(&stringIt{
base: &base{
current: 0,
size: len(element),
},
elements: element,
})
}

// OfSlice return a Stream. the input parameter `slice` must be a slice.
// if input is nil, return a empty Stream( same as Of() )
func OfSlice(slice types.T) Stream {
Expand Down
56 changes: 34 additions & 22 deletions impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,16 @@ func (s *stream) ForEach(consumer types.Consumer) {

// ToSlice 转为切片
func (s *stream) ToSlice() []types.T {
var slice []types.T
s.terminal(newTerminalStage(func(t types.T) {
slice = append(slice, t)
}, begin(func(size int64) {
if size > 0 {
slice = make([]types.T, 0, size)
} else {
slice = make([]types.T, 0)
return s.ReduceBy(func(count int64) types.R {
if count >= 0 {
return make([]types.T, 0, count)
}
})))
return slice
return make([]types.T, 0)
}, func(acc types.R, t types.T) types.R {
slice := acc.([]types.T)
slice = append(slice, t)
return slice
}).([]types.T)
}

// ToElementSlice needs a argument cause the stream may be empty
Expand All @@ -255,17 +254,17 @@ func (s *stream) ToElementSlice(some types.T) types.R {
// ToRealSlice
func (s *stream) ToSliceOf(typ reflect.Type) types.R {
sliceType := reflect.SliceOf(typ)
var sliceValue reflect.Value
s.terminal(newTerminalStage(func(t types.T) {
sliceValue = reflect.Append(sliceValue, reflect.ValueOf(t))
}, begin(func(size int64) {
if size > 0 {
sliceValue = reflect.MakeSlice(sliceType, 0, int(size))
} else {
sliceValue = reflect.MakeSlice(sliceType, 0, 0)
return s.ReduceBy(func(size int64) types.R {
if size >= 0 {
return reflect.MakeSlice(sliceType, 0, int(size))
}
})))
return sliceValue.Interface()
return reflect.MakeSlice(sliceType, 0, 16)
}, func(acc types.R, t types.T) types.R {
sliceValue := acc.(reflect.Value)
sliceValue = reflect.Append(sliceValue, reflect.ValueOf(t))
return sliceValue
}).(reflect.Value).
Interface()
}

// AllMatch 测试是否所有元素符合断言
Expand Down Expand Up @@ -324,7 +323,7 @@ func (s *stream) Reduce(accumulator types.BinaryOperator) optional.Optional {
return optional.Empty()
}

// ReduceFrom 从给定的初始值 identity(类型和元素类型相同) 开始迭代 使用 accumulator(2个入参类型和返回类型相同) 累计结果
// ReduceFrom 从给定的初始值 initValue(类型和元素类型相同) 开始迭代 使用 accumulator(2个入参类型和返回类型相同) 累计结果
func (s *stream) ReduceFrom(initValue types.T, accumulator types.BinaryOperator) types.T {
var result = initValue
s.terminal(newTerminalStage(func(t types.T) {
Expand All @@ -333,7 +332,7 @@ func (s *stream) ReduceFrom(initValue types.T, accumulator types.BinaryOperator)
return result
}

// ReduceWith 使用给定的初始值 identity(类型和元素类型不同) 开始迭代 使用 accumulator( R + T -> R) 累计结果
// ReduceWith 使用给定的初始值 initValue(类型和元素类型不同) 开始迭代 使用 accumulator( R + T -> R) 累计结果
func (s *stream) ReduceWith(initValue types.R, accumulator func(types.R, types.T) types.R) types.R {
var result = initValue
s.terminal(newTerminalStage(func(t types.T) {
Expand All @@ -342,6 +341,19 @@ func (s *stream) ReduceWith(initValue types.R, accumulator func(types.R, types.T
return result
}

// ReduceBy 使用给定的初始化方法(参数是元素个数,或-1)生成 initValue, 然后使用 accumulator 累计结果
// ReduceBy use `buildInitValue` to build the initValue, which parameter is a int64 means element size, or -1 if unknown size.
// Then use `accumulator` to add each element to previous result
func (s *stream) ReduceBy(buildInitValue func(int64) types.R, accumulator func(types.R, types.T) types.R) types.R {
var result types.R
s.terminal(newTerminalStage(func(e types.T) {
result = accumulator(result, e)
}, begin(func(count int64) {
result = buildInitValue(count)
})))
return result
}

func (s *stream) FindFirst() optional.Optional {
var result types.T
var find = false
Expand Down
55 changes: 55 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,61 @@ func (s *sliceIterator) Next() types.T {

// endregion sliceIterator

type intsIt struct {
*base
elements []int
}

func (i *intsIt) Next() types.T {
e := i.elements[i.current]
i.current++
return e
}

type int64sIt struct {
*base
elements []int64
}

func (i *int64sIt) Next() types.T {
e := i.elements[i.current]
i.current++
return e
}

type float32sIt struct {
*base
elements []float32
}

func (i *float32sIt) Next() types.T {
e := i.elements[i.current]
i.current++
return e
}

type float64sIt struct {
*base
elements []float64
}

func (i *float64sIt) Next() types.T {
e := i.elements[i.current]
i.current++
return e
}

type stringIt struct {
*base
elements []string
}

func (i *stringIt) Next() types.T {
e := i.elements[i.current]
i.current++
return e
}

// region sliceIt

// sliceIt 切片迭代器 反射实现
Expand Down
5 changes: 4 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ type Stream interface {
// type of initValue is same as element. (T, T) -> T
ReduceFrom(initValue types.T, accumulator types.BinaryOperator) types.T
// type of initValue is different from element. (R, T) -> R
ReduceWith(initValue types.R, accumulator func(types.R, types.T) types.R) types.R
ReduceWith(initValue types.R, accumulator func(acc types.R, e types.T) types.R) types.R
// ReduceBy use `buildInitValue` to build the initValue, which parameter is a int64 means element size, or -1 if unknown size.
// Then use `accumulator` to add each element to previous result
ReduceBy(buildInitValue func(sizeMayNegative int64) types.R, accumulator func(acc types.R, e types.T) types.R) types.R
FindFirst() optional.Optional
// 返回元素个数
Count() int64
Expand Down

0 comments on commit 569337c

Please sign in to comment.