For now, the library is context-agnostic by design. For HTTP timeouts, you'd use Go's standard approaches: either set the HTTP client timeout or pass a context with timeout to each request. Please let me know more about your use case - I'll let you know if Rill isn't a good fit.
I am thinking on it. To be honest, the current design works fine for my use cases: simply put, the function that defines a pipeline should have context.WithCancel() and defer cancel() calls.
I need a feedback on this. What kind of builtin context support would work for you? Do you need something like errgroup's ability to automatically cancel the context on first error?
The lib is based on channels and inherits the channel behavior in terms of backpressure. Simply put if no-one reads on one side of the pipeline, it wouldn't be possible to write anything on the other side. Still, it's possible to add buffering at arbitrary points in the pipeline using the rill.Buffer function.
Hi everyone. Posting on HN for the first time.
I'd like to share Rill - a toolkit for composable channel-based concurrency, that makes it easy to build concurrent programs from simple, reusable parts
Example of what it looks like:
// Convert a slice into a channel
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// Read users from API with concurrency=3
users := rill.Map(ids, 3, func(id int) (*User, error) {
return api.GetUser(ctx, id)
})
// Process users with concurrency=2
err := rill.ForEach(users, 2, func(u *User) error {
if !u.IsActive {
u.IsActive = true
return api.SaveUser(ctx, u)
}
return nil
})
// Handle errors
fmt.Println("Error:", err)
Key features:
- Makes concurrent code composable and clean
- Works for both simple cases and complex pipelines
- Built-in batching support
- Order preservation when needed
- Centralized error handling
- Zero dependencies
The library grew from solving real concurrency challenges in production. Happy to discuss the design decisions or answer any questions.
I'm curious - what other technologies/libraries/APIs, including in other languages, did you draw on for inspiration, or would you say are similar to Rill?
The short answer would be: I kept writing a code that spawns goroutines, that read from a channel, do some processing and write results to another channel. Add some wait/err groups to this and we'll get a lot of boilerplate repeated all over the place. I viewed this as "channel transformations" and wanted to abstract it away. When generics came out it became technically possible.
Surprisingly, part of my inspiration came from scala (which I haven't touched since 2014). Back then Scala had transformable streams and the "Try" typethen.
The channel-focused approach to stream processing reminds me of Heka [0]. It was a contemporary of Samza and Heron, and it was fairly prominent in the early Go ecosystem (maybe 10 years ago). As I recall it, quite foggily and quite a long while later, one of the final nails in Heka's coffin was that channel throughput didn't scale well. Do you have benchmarks for Rill, or is it not intended for high-throughput use cases?
I have some benchmarks in the project's wiki on Github. I can confirm your point: Rill's main bottleneck is channel operations, the library itself adds negligible overhead on top. Of course, for multi-stage pipelines the number of channel operations grows.
To summarize, I believe Rill's performance would be fine for any problem solvable with Go channels. I've used it for a wide variety of use cases - while I'm not sure I can call them high-throughput, I've had pipelines transferring hundreds of GBs of data with no performance issues.
I found your library a few weeks ago when I was annoyed by nothing like this being built into the standard library. It’s been a breeze to use so far.
A neat trick I found to gauge bottlenecks in pipelines is using Buffers between steps and running a goroutine that periodically prints `len(buffered)/cap(buffered)`
Thank you very much for the feedback.
I thought about something similar some time ago. Buffer of size one, then measure the average time each item spends in the buffer. But for debugging your approach is simpler and more practical.
No, it's the opposite - the library treats channels as streams, processing items as they arrive without needing to know the total size in advance. This is why it can handle infinite streams and large datasets that don't fit in memory.