Groovy-stream is a library for Groovy that lets you create lazy Iterators (or streams) based on Iterators, Collections, Maps, Iterables or other Streams.

Brief Example

As a simple example, lets create a Stream representing all positive integers:

@Grab( 'com.bloidonia:groovy-stream:0.6.2' )
import groovy.stream.Stream

def integers = Stream.from { x++ } using x:1

We can then create a Stream that is fed from integers and returns a Stream of squares:

def squares = Stream.from integers map { it * it }

And we can then take the first 5 elements from this stream with:

def first5 = squares.take( 5 ).collect()
assert first5 == [ 1, 4, 9, 16, 25 ]

It is only at the point where we call collect() that any integers are generated by the original integers Stream.

Slightly more complex example showing faux list comprehension

We can use the Stream to mimic the behaviour of a list comprehension. Lets say we want to do the following:

For all values of x from 1 to 5 and all values of y from 1 to 3, return x + y if ( x + y ) % ( x + 2 ) == 0

def s = Stream.from( x:1..5, y:1..3 )
              .filter { ( x + y ) % ( x + 2 ) == 0 }
              .map { x + y }

// Returns results for:
//  3 - when - x:1, y:2 as (1+2)%(1+2) == 0
//  4 - when - x:2, y:2 as (2+2)%(2+2) == 0
//  5 - when - x:3, y:2 as (3+2)%(3+2) == 0
//  6 - when - x:4, y:2 as (4+2)%(4+2) == 0
//  7 - when - x:5, y:2 as (5+2)%(5+2) == 0
assert s.collect() == [ 3, 4, 5, 6, 7 ]

Anatomy of a Stream

Streams are composed of 4 parts:

  1. The input, which can be an Iterable, an Iterator, a Map, a Closure or another Stream
  2. A filter predicate Closure which removes items from the input stream as they are read
  3. A map which transforms the next item in the Stream into another object or value
  4. A using block of variables that the stream may access (and modify in the map closure)

There is can be many filter and map steps (since v0.6) in a Stream and they
are executed in the order they are defined.

There can only be a single using block (and I am considering dropping the using block
as it doesn't feel functionally sound).

Input Examples

Examples of valid input are (just input on it's own isn't very exciting, but hereare examples anyway):

assert [ 1, 2, 3 ]       == Stream.from( [ 1, 2, 3 ] ).collect()              // An iterable
assert [ 'a', 'b' ]      == Stream.from( 'a'..'z' ).take( 2 ).collect()       // An iterable
assert [ [ x:1, y:3 ] ]  == Stream.from( x:1..3, y:3..4 ).take( 1 ).collect() // A map

Filter Examples

Just the even numbers from the 1..10 Range

Stream s = Stream.from 1..10 filter { it % 2 == 0 }
assert s.collect() == [ 2, 4, 6, 8, 10 ]

Just the elements where the sum of the values is even (note that the from map is set as the
delegate of the filter closure, so you can access them without requiring the it.a prefixed
notation:

Stream s = Stream.from a:1..2, b:3..4 filter { ( a + b ) % 2 == 0 }
assert s.collect() == [ [a:1,b:3], [a:2,b:4] ]

It is also possible to tell a Stream to stop running by using an until predicate Closure.

def x = 0

def eternal = [ hasNext:{ true }, next:{ x++ } ] as Iterator

Stream s = Stream.from eternal until { it >= 5 }

assert s.collect() == [ 0, 1, 2, 3, 4 ]

Map Examples

Stream s = Stream.from 1..5 map { it + 5 }
assert s.collect() == [ 6, 7, 8, 9, 10 ]
s = Stream.from a:1..2, b:3..4 map { a + b }
assert s.collect() == [ 4, 5, 5, 6 ]

Using Examples

The Stream of integers from above

assert [ 1, 2, 3 ] == Stream.from { j++ }.using( j:1 ).take( 3 ).collect()

Starting with idx:0, return elements with an index element before them

Stream s = Stream.from 'a'..'c' map { [ idx++, it ] } using idx:0
assert s.collect() == [ [0,'a'], [1,'b'], [2,'c'] ]

Other methods

There are currently 2 other methods available in a Stream definition:

flatMap

The flatMap step returns a List of elements, and each element of this list is then passed
individually along the rest of the stream. Only when the list is exhausted will the
stream go back to the source for another value to process:

Stream s = Stream.from 1..5 flatMap { [ 'a', it ] }
assert s.collect() == [ 'a', 1, 'a', 2, 'a', 3, 'a', 4, 'a', 5 ]

collate

The collate call will group the Stream into chunks of a given size, and will only
release an item to the rest of the stream when this criteria is met (or the stream terminates).

Stream s = Stream.from 1..5 collate( 3 )
assert s.collect() == [ [ 1, 2, 3 ], [ 4, 5 ] ]

You can also (as in Groovy) give collate a step value to give you a sliding window effect:

// Step along by 2 each time
Stream s = Stream.from 1..5 collate( 3, 1 )
assert s.collect() == [ [ 1, 2, 3 ], [ 2, 3, 4 ], [ 3, 4, 5 ], [ 4, 5 ], [ 5 ] ]

And You can tell it to drop the remainder results:

// Step along by 2 each time
Stream s = Stream.from 1..5 collate( 3, 1, false )
assert s.collect() == [ [ 1, 2, 3 ], [ 2, 3, 4 ], [ 3, 4, 5 ] ]

tap and tapEvery

To look at every 2nd element as it passes the stream, you can do:

// prints '2' and '4' as the Stream progresses
def s = Stream.from 1..5 tapEvery( 2 ) { idx -> println idx } collect()

// prints '2 : b' and '4 : d' as the Stream progresses
def s = Stream.from 'a'..'e' tapEvery( 2 ) { idx, obj -> println "$idx : $obj" } collect()