Skip to content

Commit

Permalink
Merge pull request #18 from life4/channel-cancellation
Browse files Browse the repository at this point in the history
Channels: improved cancellation and channel buffering
  • Loading branch information
orsinium committed Aug 7, 2023
2 parents 4d239b1 + 9a998e3 commit 840ee23
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 33 deletions.
7 changes: 6 additions & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
version: "3"

tasks:
test:
desc: Run go tests with coverage and timeout and without cache
cmds:
- go test -count 1 -cover -timeout 1s ./...

release:
desc: "Tag and upload release"
desc: Tag and upload release
cmds:
- which gh
- test v{{.CLI_ARGS}}
Expand Down
166 changes: 144 additions & 22 deletions channels/channel.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package channels

import (
"github.com/life4/genesis/constraints"
"context"
"sync"

"github.com/life4/genesis/constraints"
)

// Any returns true if f returns true for any element in channel
// Any returns true if f returns true for any element in channel.
func Any[T any](c <-chan T, f func(el T) bool) bool {
for el := range c {
if f(el) {
Expand All @@ -15,7 +17,7 @@ func Any[T any](c <-chan T, f func(el T) bool) bool {
return false
}

// All returns true if f returns true for all elements in channel
// All returns true if f returns true for all elements in channel.
func All[T any](c <-chan T, f func(el T) bool) bool {
for el := range c {
if !f(el) {
Expand All @@ -25,10 +27,20 @@ func All[T any](c <-chan T, f func(el T) bool) bool {
return true
}

// ChunkEvery returns channel with slices containing count elements each
// ChunkEvery returns channel with slices containing count elements each.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func ChunkEvery[T any](c <-chan T, count int) chan []T {
chunks := make(chan []T, 1)
go func() {
defer close(chunks)
chunk := make([]T, 0, count)
i := 0
for el := range c {
Expand All @@ -43,7 +55,6 @@ func ChunkEvery[T any](c <-chan T, count int) chan []T {
if len(chunk) > 0 {
chunks <- chunk
}
close(chunks)
}()
return chunks
}
Expand All @@ -61,56 +72,83 @@ func Count[T comparable](c <-chan T, el T) int {

// Drop drops first n elements from channel c and returns a new channel with the rest.
// It returns channel do be unblocking. If you want array instead, wrap result into TakeAll.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func Drop[T any](c <-chan T, n int) chan T {
result := make(chan T)
go func() {
defer close(result)
i := 0
for el := range c {
if i >= n {
result <- el
}
i++
}
close(result)
}()
return result
}

// Each calls f for every element in the channel
// Each calls f for every element in the channel.
func Each[T any](c <-chan T, f func(el T)) {
for el := range c {
f(el)
}
}

// Filter returns a new channel with elements from input channel
// for which f returns true
// for which f returns true.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func Filter[T any](c <-chan T, f func(el T) bool) chan T {
result := make(chan T)
go func() {
defer close(result)
for el := range c {
if f(el) {
result <- el
}
}
close(result)
}()
return result
}

// Map applies f to all elements from channel and returns channel with results
// Map applies f to all elements from channel and returns channel with results.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func Map[T any, G any](c <-chan T, f func(el T) G) chan G {
result := make(chan G, 1)
go func() {
defer close(result)
for el := range c {
result <- f(el)
}
close(result)
}()
return result
}

// Max returns the maximal element from channel
// Max returns the maximal element from channel.
func Max[T constraints.Ordered](c <-chan T) (T, error) {
max, ok := <-c
if !ok {
Expand All @@ -124,7 +162,7 @@ func Max[T constraints.Ordered](c <-chan T) (T, error) {
return max, nil
}

// Min returns the minimal element from channel
// Min returns the minimal element from channel.
func Min[T constraints.Ordered](c <-chan T) (T, error) {
min, ok := <-c
if !ok {
Expand All @@ -138,28 +176,37 @@ func Min[T constraints.Ordered](c <-chan T) (T, error) {
return min, nil
}

// Reduce applies f to acc and every element from channel and returns acc
// Reduce applies f to acc and every element from channel and returns acc.
func Reduce[T any, G any](c <-chan T, acc G, f func(el T, acc G) G) G {
for el := range c {
acc = f(el, acc)
}
return acc
}

// Scan is like Reduce, but returns slice of f results
// Scan is like Reduce, but returns slice of f results.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func Scan[T any, G any](c <-chan T, acc G, f func(el T, acc G) G) chan G {
result := make(chan G, 1)
go func() {
defer close(result)
for el := range c {
acc = f(el, acc)
result <- acc
}
close(result)
}()
return result
}

// Sum returns sum of all elements from channel
// Sum returns sum of all elements from channel.
func Sum[T constraints.Ordered](c <-chan T) T {
var sum T
for el := range c {
Expand All @@ -169,6 +216,15 @@ func Sum[T constraints.Ordered](c <-chan T) T {
}

// Take takes first count elements from the channel.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func Take[T any](c <-chan T, count int) chan T {
result := make(chan T)
go func() {
Expand All @@ -188,13 +244,28 @@ func Take[T any](c <-chan T, count int) chan T {
return result
}

// Tee returns "count" number of channels with elements from the input channel
// Tee returns "count" number of channels with elements from the input channel.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channels are closed when this goroutine finishes.
//
// ⏸️ The returned channels are unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from all the output channels
// is consumed by another goroutine(s).
func Tee[T any](c <-chan T, count int) []chan T {
channels := make([]chan T, 0, count)
for i := 0; i < count; i++ {
channels = append(channels, make(chan T))
}
go func() {
defer func() {
for _, ch := range channels {
close(ch)
}
}()

for el := range c {
wg := sync.WaitGroup{}
putInto := func(ch chan T) {
Expand All @@ -203,13 +274,10 @@ func Tee[T any](c <-chan T, count int) []chan T {
}
wg.Add(count)
for _, ch := range channels {
putInto(ch)
go putInto(ch)
}
wg.Wait()
}
for _, ch := range channels {
close(ch)
}
}()
return channels
}
Expand All @@ -222,3 +290,57 @@ func ToSlice[T any](c <-chan T) []T {
}
return result
}

// WithBuffer creates an echo channel of the given one with the given buffer size.
//
// This function effectively makes writes into the given channel non-blocking
// until the buffer size of pending messages is reached, assuming that all reads
// will be done only from the channel that the function returns.
//
// ⏹️ Internally, the function starts a goroutine.
// This goroutine finishes when the input channel is closed.
// The returned channel is closed when this goroutine finishes.
func WithBuffer[T any](c <-chan T, bufSize int) chan T {
result := make(chan T, bufSize)
go func() {
defer close(result)
for el := range c {
result <- el
}
}()
return result
}

// WithContext creates an echo channel of the given one that can be cancelled with ctx.
//
// ⏹️ Internally, the function starts a goroutine
// that copies values from the input channel into the output one.
// This goroutine finishes when the input channel is closed
// or the ctx context is cancelled.
// The returned channel is closed when this goroutine finishes.
//
// ⏸️ The returned channel is unbuffered.
// The goroutine will be blocked and won't consume elements
// from the input channel until the value from the output channel
// is consumed by another goroutine.
func WithContext[T any](c <-chan T, ctx context.Context) chan T {
result := make(chan T)
go func() {
defer close(result)
for {
select {
case <-ctx.Done():
return
case val, more := <-c:
if !more {
return
}
select {
case result <- val:
case <-ctx.Done():
}
}
}
}()
return result
}
Loading

0 comments on commit 840ee23

Please sign in to comment.