Skip to content

Commit

Permalink
Add Concat and FlatMap
Browse files Browse the repository at this point in the history
Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Feb 3, 2024
1 parent 63c7f67 commit 8b02742
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 0 deletions.
68 changes: 68 additions & 0 deletions operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,74 @@ func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Res
})
}

// Concat takes one or more observable of the same type and emits the items from each of
// them in order.
func Concat[T any](srcs ...Observable[T]) Observable[T] {
return FuncObservable[T](
func(ctx context.Context, next func(T), complete func(error)) {
go func() {
for _, src := range srcs {
errs := make(chan error, 1)
src.Observe(
ctx,
next,
func(err error) {
if err != nil {
errs <- err
}
close(errs)
},
)
if err, ok := <-errs; ok {
complete(err)
return
}
}
complete(nil)
}()
})
}

// FlatMap applies a function that returns an observable of Bs to the source observable of As.
// The observable from the 'apply' function is flattened to produce a flat stream of Bs.
func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B] {
return FuncObservable[B](
func(ctx context.Context, next func(B), complete func(error)) {
ctx, cancel := context.WithCancel(ctx)
innerErrs := make(chan error, 1)
src.Observe(
ctx,
func(a A) {
done := make(chan struct{})
apply(a).Observe(
ctx,
next,
func(err error) {
if err != nil {
select {
case innerErrs <- err:
default:
}
cancel()
}
close(done)
},
)
<-done
},
func(err error) {
defer close(innerErrs)
select {
case innerErr := <-innerErrs:
complete(innerErr)
default:
complete(err)
}
},
)
})
}

// Distinct skips adjacent equal values.
//
// Distinct(FromSlice([]int{1,1,2,2,3})
Expand Down
51 changes: 51 additions & 0 deletions operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,57 @@ func TestThrottle(t *testing.T) {
}
}

func TestFlatMap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

negated := func(x int) Observable[int] {
return FromSlice([]int{x, -x})
}

// 1. mapping a non-empty source
{
src := Range(0, 3)
src = FlatMap(src, negated)
result, err := ToSlice(ctx, src)
assertNil(t, "case 1", err)
assertSlice(t, "case 1", []int{0, 0, 1, -1, 2, -2}, result)
}

// 2. mapping an empty source
{
src := FlatMap(Empty[int](), negated)
result, err := ToSlice(ctx, src)
assertNil(t, "case 2", err)
assertSlice(t, "case 2", []int{}, result)
}

// 3. cancelled context
checkCancelled(t, "case 3", FlatMap(Range(0, 100), negated))
}

func TestConcat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 1. successful case
res1, err := ToSlice(ctx, Concat(Just(1), Just(2), Just(3)))
if err != nil {
t.Fatalf("case 1 errored: %s", err)
}
assertSlice(t, "case 1", res1, []int{1, 2, 3})

// 2. test cancelled concat
checkCancelled(t, "case 2", Concat(Just(1), Stuck[int]()))

// 3. test empty concat
res3, err := ToSlice(ctx, Concat[int]())
if err != nil {
t.Fatalf("case 3 errored: %s", err)
}
assertSlice(t, "case 3", []int{}, res3)
}

func TestRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 8b02742

Please sign in to comment.