Since I've been talking about streams in Elixir, I'm going to cover the Stream module. The Stream module has many functions that are very similar to the Enum module, so I don't feel a need to re-explain the concepts that I previously discussed in my posts on the Enum module. I'm going to assume that you've already read about the Enum module functions, so the coverage of similar functions we've seen before in the Enum module will be brief.

All the functions in the Stream module use lazy processing unless otherwise noted. Most of these functions return a stream that can be used to build a stream transformation pipeline. Remember that since streams are also enumerables, you can also perform eager processing on streams as well using functions in Enum. Lazy processing is common with streams, but eager processing can also useful at times. In the following examples, I use functions like Enum.to_list/1 or Enum.take/2 to start stream processing and extract the contents of a stream for display.

I just came to the realization while looking over the Stream functions that almost all of these functions can be used to create a stream, not just the more obvious stream creation functions. Many of these function create streams by transforming other streams, but they can also create streams from any enumerable. The documentation makes it clear that these functions will lazily process any enumerable, not just streams. That realization erased my previous mental division of the Stream functions between stream-creation functions and stream-transformation functions.

Stream.chunk_by/2

The Stream.chunk_by/2 function works just like Enum.chunk_by/2 except that it creates a stream instead of a list. It buffers the input elements until the function parameter returns true, and then it emits the chunk (which is a list of elements) as a single item in the stream.

iex> is_even = &(rem(&1, 2) == 0)
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.chunk_by([2, 4, 6, 8, 9, 1, 4, -2, 10], is_even) |> Enum.to_list()
[[2, 4, 6, 8], [9, 1], [4, -2, 10]]

Stream.chunk_every/2

The Stream.chunk_every/2 function works just like Enum.chunk_every/2 except that it creates a stream instead of a list. It creates a stream that emits chunks of elements.

iex> Stream.chunk_every(1..10, 3) |> Enum.to_list()
[[1, 2, 3], [4, 5, 6], '\a\b\t', '\n']
iex> Stream.chunk_every(1..10, 4) |> Enum.to_list()
[[1, 2, 3, 4], [5, 6, 7, 8], '\t\n']

Stream.chunk_every/4

The Steam.chunk_every/4 function works just like Enum.chunk_every/4 except that it creates a stream instead of a list. It creates a stream that emits a chunk of elements with some extra options that control how chunks are created.

iex> Stream.chunk_every(1..10, 3, 2) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5], [5, 6, 7], '\a\b\t', '\t\n']
iex> Stream.chunk_every(1..10, 3, 1) |> Enum.to_list()
[
  [1, 2, 3],
  [2, 3, 4],
  [3, 4, 5],
  [4, 5, 6],
  [5, 6, 7],
  [6, 7, 8],
  '\a\b\t',
  '\b\t\n',
  '\t\n'
]
iex> Stream.chunk_every(1..10, 3, 2, ["a", "b", "c"]) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5], [5, 6, 7], '\a\b\t', [9, 10, "a"]]
iex> Stream.chunk_every(1..10, 4, 2, ["a", "b", "c"]) |> Enum.to_list()
[[1, 2, 3, 4], [3, 4, 5, 6], [5, 6, 7, 8], '\a\b\t\n', [9, 10, "a", "b"]]
iex> Stream.chunk_every(1..10, 4, 2, :discard) |> Enum.to_list()
[[1, 2, 3, 4], [3, 4, 5, 6], [5, 6, 7, 8], '\a\b\t\n']

Stream.chunk_while/4

The Stream.chunk_while/4 function works just like Enum.chunk_while/4 except that it creates a stream instead of a list. It creates a stream that emits chunks of elements, where the process of chunk creation is controlled by functions passed in as parameters.

iex> even_number_chunker = fn
...> (element, []) -> {:cont, [element]}
...> (element, chunk = [prev | _]) when rem(prev, 2) == rem(element, 2) -> {:cont, [element | chunk]}
...> (element, chunk = [prev | _]) when rem(prev, 2) != rem(element, 2) -> {:cont, chunk, [element]}
...> end
#Function<12.99386804/2 in :erl_eval.expr/5>
iex> after_func = fn chunk -> {:cont, chunk, []} end
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.chunk_while([1, 2, 4, 10, 3, 31, 43, 20], [], even_number_chunker, after_func) |> Enum.to_list()
[[1], [10, 4, 2], [43, 31, 3], [20]]

Stream.concat/1

The Stream.concat/1 function works just like Enum.concat/1 except that it creates a stream instead of a list. It creates a stream whose contents are composed of a list of enumerables concatenated to each other.

iex> Stream.concat([1..10, ["a", "b", "c"], -3..-8]) |> Enum.to_list()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, "a", "b", "c", -3, -4, -5, -6, -7, -8]

Stream.concat/2

The Stream.concat/2 function works just like Enum.concat/2 except that it creates a stream instead of a list. It concatenates to enumerables together to create the contents of a stream.

iex> Stream.concat(1..5, ["a", "b", "c", "d"]) |> Enum.to_list()
[1, 2, 3, 4, 5, "a", "b", "c", "d"]

Stream.cycle/1

The Stream.cycle/1 function takes an enumerable and creates a stream that cycles through the elements in the enumerable indefinitely. This stream never ends, so I'm going to use Enum.take/2 to sample the contents rather than foolishly attempting to use Enum.to_list/1, which would never complete because the contents are infinite.

iex> stream = Stream.cycle(1..10)
#Function<9.77875168/2 in Stream.cycle/1>
iex> Enum.take(stream, 27)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6,
 7]
iex> Enum.take(stream, 5)
[1, 2, 3, 4, 5]

Stream.dedup/1

The Stream.dedup/1 function works just like Enum.dedup/1 except that it creates a stream instead of a list. It creates a stream that only emits elements that are different from the last-emitted element.

iex> Stream.dedup(1..5) |> Enum.to_list()
[1, 2, 3, 4, 5]
iex> Stream.dedup([1, 1, 1, 2, 2, 3, 4, 3, 4]) |> Enum.to_list()
[1, 2, 3, 4, 3, 4]

Stream.dedup_by/2

The Stream.dedup_by/2 function works just like Enum.dedup_by/2 except that it creates a stream instead of a list. The stream emits elements that are different from the last-emitted element, like Stream.dedup/1, except that you also pass in a function that determines if two eleemnts are different.

The following example involves a function that considers numbers with the same "evenness" to be the same.

iex> evenness = &(rem(&1, 2))
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.dedup_by(1..5, evenness) |> Enum.to_list()
[1, 2, 3, 4, 5]
iex> Stream.dedup_by([2, -2, 8, 10, 4, 3, 1, 8, -5], evenness) |> Enum.to_list()
[2, 3, 8, -5]

Stream.drop/2

The Stream.drop/2 function works just like Enum.drop/2 except that it creates a stream instead of a list. This function causes N elements to be dropped from its input.

iex> Stream.drop(1..10, 3) |> Enum.to_list()
[4, 5, 6, 7, 8, 9, 10]
iex> Stream.drop(1..10, 8) |> Enum.to_list()
'\t\n'
iex> Stream.drop(1..10, 12) |> Enum.to_list()
[]

Stream.drop_every/2

The Stream.drop_every/2 function works just like Enum.drop_every/2 except that it creates a stream instead of a list. This function causes every Nth element to be dropped from its input, starting with the first element.

iex> Stream.drop_every(1..10, 3) |> Enum.to_list()
[2, 3, 5, 6, 8, 9]
iex> Stream.drop_every(1..10, 2) |> Enum.to_list()
[2, 4, 6, 8, 10]
iex> Stream.drop_every(1..10, 1) |> Enum.to_list()
[]

Stream.drop_while/2

The Stream.drop_while/2 function works just like Enum.drop_while/2 except that it creates a stream instead of a list. This function will drop elements from its input as long as the parameter function returns a truthy value.

iex> is_even = &(rem(&1, 2) == 0)
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.drop_while([1, 2, 3], is_even) |> Enum.to_list()
[1, 2, 3]
iex> Stream.drop_while([2, 3, 4], is_even) |> Enum.to_list()
[3, 4]
iex> Stream.drop_while([2, 8, 10, 12, 3, 4, 8], is_even) |> Enum.to_list()
[3, 4, 8]

Stream.each/2

The Stream.each/2 function is similar to Enum.each/2 in that it provides a way to produce a side effect for each element in the stream. Unlike Enum.each/2, which just returns :ok, Stream.each/2 returns a stream containing the unaltered input. That means I can pass a stream through Stream.each/2 to create side effects without altering the stream in any way. I used this in the previous post on streams to print out what each element looked like in the middle of a stream transformation pipeline.

iex> 1..5 |> Stream.map(&(&1 * 2)) |> Stream.each(fn item -> IO.puts(item) end) |> Enum.to_list()
2
4
6
8
10
[2, 4, 6, 8, 10]

The above example creates a stream that doubles the numbers 1..5, prints them out, and then stores the contents of the final stream in a list.

Stream.filter/2

The Stream.filter/2 function works just like Enum.filter/2 except that it creates a stream instead of a list.

iex> Stream.filter(1..10, &(&1 > 5)) |> Enum.to_list()
[6, 7, 8, 9, 10]
iex> is_even = &(rem(&1, 2) == 0)
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.filter(1..10, is_even) |> Enum.to_list()
[2, 4, 6, 8, 10]

Stream.flat_map/2

The Stream.flat_map/2 function works just like Enum.flat_map/2 except that it creates a stream instead of a list.

iex> Stream.flat_map(1..5, fn number -> 1..number end) |> Enum.to_list()
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]

Stream.intersperse/2

The Stream.intersperse/2 function works just like Enum.intersperse/2 except that it creates a stream instead of a list.

iex> Stream.intersperse(1..10, "yep") |> Enum.to_list()
[1, "yep", 2, "yep", 3, "yep", 4, "yep", 5, "yep", 6, "yep", 7, "yep", 8, "yep",
 9, "yep", 10]
iex> Stream.intersperse(1..10, -1) |> Enum.to_list()
[1, -1, 2, -1, 3, -1, 4, -1, 5, -1, 6, -1, 7, -1, 8, -1, 9, -1, 10]

Stream.interval/1

The Stream.interval/1 function starts generating a series of numbers starting with 0 and adding 1 every time. The numbers are separated by N milliseconds, where N is specified by the function parameter.

The documentation warns that this operation blocks the caller during the time between number generation, and recommends using Stream.iterate/2 instead if you don't need to block the caller process. Why anyone would need to block the caller process is beyond me, but that's what it does.

This example creates a stream that generates a number once per second, and prints out each number as it is generated. This is an infinite stream, so I had to stop IEx to get it stop.

iex> Stream.interval(1000) |> Stream.each(fn number -> IO.puts(number) end) |> Stream.run()
0
1
2
3
4
5
6
7
8
9
10

I imagine that this function is useful if you need some kind of timer or the ability to emit an element after a certain period of time.

Stream.into/3

The Stream.into/3 function works just like Enum.into/3 except that it returns a stream containing the unaltered input. This function is similar in nature to Stream.each/2 in that it does its thing and then returns an unaltered stream. Stream.into/3, like other "into" functions, inserts data into a collectable, which is a protocol that some data structures implement.

Since this function returns a stream and not the modified collectable, it produces a side-effect (like Enum.each/2). This means that if you pass in a list, you will have no way of retrieving the modified list because all you will have access to is the original list. Here's a demonstration showing what I mean.

iex> list = []
[]
iex> Stream.into(1..5, list) |> Enum.to_list()
[1, 2, 3, 4, 5]
iex> list
[]

The variable list still points to the original (immutable of course) list.

So in practice, this means that we'll mostly be streaming data into another stream, such as a file stream. It's a way of piping the contents of an input stream into an output stream. So for the following example, I figured out how to create a stream to standard input and output so that I can write the contents of the stream to standard out.

#Create a stream that reads from stdin and writes to stdout
iex> output_stream = IO.binstream(:stdio, :line)
%IO.Stream{device: :standard_io, line_or_bytes: :line, raw: true}
#Stream the contents of the list to stdout
iex> ["Bob", "Sam", "Jonas"] |> Stream.into(output_stream) |> Enum.to_list()
BobSamJonas["Bob", "Sam", "Jonas"]
#Stream the contents of the list to stdout, using the transform function parameter to add a newline
#character to the end of each name
iex> ["Bob", "Sam", "Jonas"] |> Stream.into(output_stream, fn name -> name <> "\n" end) |> Enum.to_list()
Bob
Sam
Jonas
["Bob", "Sam", "Jonas"]

The stream is piped to standard out and displayed on the screen. Then the final result of the stream pipeline (a list) is displayed.

Stream.iterate/2

The Stream.iterate/2 function receives a start value and a function as parameters emits a sequence of values. The first value emitted is the start value and the next values are generated by passing the previous value to the parameter function.

This stream that this function produces is an infinite stream and will keep producing the next value in the series as long as it is asked to.

The following example creates a stream that starts with 1, and emits values that are double the previous value. It's an infinite number series.

iex> Stream.iterate(1, &(&1 * 2)) |> Enum.take(5)
[1, 2, 4, 8, 16]
iex> Stream.iterate(1, &(&1 * 2)) |> Enum.take(10)
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]

Stream.map/2

The Stream.map/2 function works just like Enum.map/2 except that it creates a stream instead of a list.

iex> Stream.map(1..10, &(&1 * &1)) |> Enum.to_list()
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

Stream.map_every/3

The Stream.map_every/3 function works just like Enum.map_every/3 except that it creates a stream instead of a list. It maps every Nth element (including the first element) instead of every element.

iex> Stream.map_every(1..10, 3, &(&1 * &1)) |> Enum.to_list()
[1, 2, 3, 16, 5, 6, 49, 8, 9, 100]

Stream.reject/2

The Stream.reject/2 function works just like Enum.reject/2 except that it creates a stream instead of a list.

iex> Stream.reject(1..10, &(&1 > 5)) |> Enum.to_list()
[1, 2, 3, 4, 5]
iex> is_even = &(rem(&1, 2) == 0)
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.reject(1..10, is_even) |> Enum.to_list()
[1, 3, 5, 7, 9]

Stream.repeatedly/1

The Stream.repeatedly/1 function is passed a generator function and then calls that generator function every time another value is read from the stream. This happens infinitely, so this function creates an infinite stream. The generator function receives no parameters, so it will either have to return a constant or do something that involves side effects.

Here's an example that generates a random number stream.

iex> Stream.repeatedly(fn -> Enum.random(1..100) end) |> Enum.take(10)
[82, 51, 47, 50, 16, 25, 9, 44, 30, 56]
iex> Stream.repeatedly(fn -> Enum.random(1..100) end) |> Enum.take(10)
[84, 82, 92, 27, 21, 48, 99, 56, 100, 35]
iex> Stream.repeatedly(fn -> Enum.random(1..100) end) |> Enum.take(10)
[47, 98, 68, 37, 14, 76, 54, 86, 38, 76]

Stream.resource/3

The Stream.resource/3 function provides a great deal of control over the content of a stream and is also the most complex. It functions by passing in three functions as parameters: an initialization function, a function that generates the next item in the stream, and a function that cleans up after the stream has finished.

As its name suggests, this function is ideal for creating streams that make use of external resources, such as files, network streams, keyboard events, etc.

I gave an example of using this function in my post on streams, so I'm going to repeat it here.

defmodule StopNumberStream do
	def create(range, stop_number) do
		Stream.resource(fn -> initialize(range, stop_number) end, &generate_next_value/1, &done/1)
	end
	
	defp initialize(range, stop_number) do
		{range, stop_number, true}
	end
	
	defp generate_next_value({range, stop_number, true}) do
		case generated_number = Enum.random(range) do
			^stop_number -> {[stop_number], {range, stop_number, false}}
			_ -> {[generated_number], {range, stop_number, true}}
		end				
	end
	
	defp generate_next_value({_, stop_number, false}) do
		{:halt, stop_number}			
	end			
	
	defp done(_) do
		nil
	end
end

You can find this code in the the "lwm 43 - Streams" folder in the code examples in the Learn With Me: Elixir repository on Github.

The initialization function defines a range (1 to 10) and a stop number (3). The stream will keep producing random numbers within the range until the stop number has been generated.

Here's the IEx output.

iex> number_stream = StopNumberStream.create(1..10, 3)
#Function<54.77875168/2 in Stream.resource/3>
iex> Enum.take(number_stream, 100)
[8, 5, 6, 9, 5, 6, 2, 6, 6, 7, 4, 5, 7, 4, 8, 9, 5, 9, 1, 9, 5, 5, 7, 8, 10, 10,
 7, 5, 1, 2, 1, 6, 5, 2, 6, 4, 7, 5, 7, 1, 10, 2, 7, 10, 3]
iex> Enum.take(number_stream, 100)
[8, 6, 2, 4, 9, 7, 4, 7, 1, 1, 9, 5, 6, 10, 10, 8, 2, 9, 5, 7, 10, 10, 9, 5, 8,
 10, 7, 4, 3]
iex> Enum.take(number_stream, 100)
[2, 4, 8, 2, 2, 4, 3]
iex> number_stream = StopNumberStream.create(1..7, 6)
#Function<54.77875168/2 in Stream.resource/3>
iex> Enum.take(number_stream, 100)
[7, 3, 5, 6]
iex> Enum.take(number_stream, 100)
[2, 3, 6]
iex> Enum.take(number_stream, 100)
[6]

I'd love to make an example that gathered keyboard input or streamed something from the network, but that is beyond my current abilities.

Stream.run/1

The Stream.run/1 function initiates stream processing. When you are setting up a lazy-evaluated stream pipeline, it is indeed truly a lazy pipeline. It will do no work until you give it a swift kick in the pants to get it going; that can either be by calling an eager function like the functions in the Enum module or by calling Stream.run/1 and passing it the stream whose processing is to be started.

Here's an example that illustrates this.

iex> stream = 1..10 |> Stream.map(&(&1 * &1)) |> Stream.filter(&(&1 < 80)) |> Stream.each(&(IO.puts(&1)))
#Stream<[
  enum: 1..10,
  funs: [#Function<48.77875168/1 in Stream.map/2>,
   #Function<40.77875168/1 in Stream.filter/2>,
   #Function<39.77875168/1 in Stream.each/2>]
]>
iex> Stream.run(stream)
1
4
9
16
25
36
49
64
:ok

On the first line, I construct the stream pipeline that creates a stream that performs some mapping, filtering, and side effects. The output of the stream data structure in IEx will even show me the data source for the stream and the functions to be applied to it. However, nothing has happened yet. I need to get it going. In this case, I passed the stream to Stream.run/1 to start processing the stream, which causes the stream elements to be written to the screen.

I could have also just put the call to Stream.run/1 at the end of the pipeline if I wanted to run it immediately.

iex> stream = 1..10 |> Stream.map(&(&1 * &1)) |> Stream.filter(&(&1 < 80)) |> Stream.each(&(IO.puts(&1))) |> Stream.run()
1
4
9
16
25
36
49
64
:ok

You may not always want to run a stream immediately. You may instead prefer to just pass back the stream so that other code can add onto it further, with stream processing being started somewhere else in the application.

Stream.scan/2

Despite its name, the Stream.scan/2 is very similar to Enum.reduce/2. It performs a reduce operation in a lazy manner. The difference is that there is no "final result" like the reduce operation in the Enum module. It passes along an accumulator from the previous call to the function like Enum.reduce/2, but the final value of that accumulator is discarded when the stream ends (assuming that the stream does end).

Here's are some examples.

iex> 1..10 |> Stream.scan(fn number, sum -> number + sum end) |> Enum.to_list()
[1, 3, 6, 10, 15, 21, 28, 36, 45, 55]
iex> 1..10 |> Stream.scan(fn number, product -> number * product end) |> Enum.to_list()
[1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800]

Stream.scan/3

The Stream.scan/3 function works just like Stream.scan/2 except that you can specify the initial accumulator value. This is the equivalent to Enum.reduce/3.

The following examples set the accumulator to 10 initially.

iex> 1..10 |> Stream.scan(10, fn number, sum -> number + sum end) |> Enum.to_list()
[11, 13, 16, 20, 25, 31, 38, 46, 55, 65]
iex> 1..10 |> Stream.scan(10, fn number, product -> number * product end) |> Enum.to_list()
[10, 20, 60, 240, 1200, 7200, 50400, 403200, 3628800, 36288000]

Stream.take/2

The Stream.take/2 function works just like Enum.take/2 except that it creates a stream instead of a list.

iex> 1..10 |> Stream.take(3) |> Enum.to_list()
[1, 2, 3]
iex> 1..10 |> Stream.take(8) |> Enum.to_list()
[1, 2, 3, 4, 5, 6, 7, 8]

Stream.take_every/2

The Stream.take_every/2 function works just like Enum.take_every/2 except that it creates a stream instead of a list. It will take every Nth element starting with the first element.

iex> 1..20 |> Stream.take_every(3) |> Enum.to_list()
[1, 4, 7, 10, 13, 16, 19]
iex> 1..20 |> Stream.take_every(6) |> Enum.to_list()
[1, 7, 13, 19]
iex> 1..20 |> Stream.take_every(1) |> Enum.to_list()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

Stream.take_while/2

The Stream.take_while/2 function works just like Enum.take_while/2 except that it creates a stream instead of a list. It will take elements until the function parameter returns a truthy value.

iex> [2, 0, -2, -4, 7, 8, 10] |> Stream.take_while(&((&1 * 2) < 20)) |> Enum.to_list()
[2, 0, -2, -4, 7, 8]
iex> is_even = &(rem(&1, 2) == 0)
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> [2, 0, -2, -4, 7, 8, 10] |> Stream.take_while(is_even) |> Enum.to_list()
[2, 0, -2, -4]

Stream.timer/1

The Stream.timer/1 function emits a single value after N milliseconds, where N is a parameter that is passed to the function. The value emitted is always 0. The purpose of this function is to act as a timer so that your stream can emit a value after a certain period of time has elapsed.

The documentation notes that this function blocks the process it's in while it is waiting to emit a value.

iex> Stream.timer(2000) |> Stream.each(fn _ -> IO.puts("Timer finished") end) |> Stream.run()
Timer finished
:ok

In the above example, nothing happens for 2 seconds, and then the text "Timer finished" appears on the screen.

Stream.transform/3

The Stream.transform/3 function allows you to have fine-grained control over stream transformations. Much in the way that Enum.reduce/3 is a more generic function can be used to implement most of the functions in the Enum module, Stream.transform/3 is a more generic function that can be used to implement most of the transformation functions in the Stream module.

Stream.transform/3 receives data to transform and the transformation function gets called for each element in the input data. Every element can be transformed or you can stop partway through and end the stream. The transformation function receives an input element and an accumulator and can emit a value from the stream. The function can continue the stream by returning a tuple where the first element is the data to be emitted from the transformed stream and the second element is the accumulator to be passed to the next call to the transformation function. The transformation function can also end the stream early by returning a tuple with :halt as the first element and the accumulator as the second element.

The data being emitted from the stream has to be enclosed in an enumerable such as a list. I'm guessing that this is so that the stream can emit multiple elements for every input element.

I'm going to use this function to create some examples. In the first example, I'm going to implement the Stream.map/2 function.

iex> map = fn (enum, func) ->
...> Stream.transform(enum, nil, fn (item, acc) -> {[func.(item)], acc} end)
...> end
#Function<12.99386804/2 in :erl_eval.expr/5>
iex> map.(1..10, &(&1 * 2)) |> Enum.to_list()
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
iex> map.(1..10, &(&1 * &1)) |> Enum.to_list()
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

Success! I now have a stream mapping function.

For the next example, I'm going to create a function that creates a stream that doubles all the elements in an enumerable. By "doubling", I mean emitting two of the same element, not multiplying the element by 2.

iex> double = fn (enum) ->
...> Stream.transform(enum, nil, fn (item, acc) -> {[item, item], acc} end)
...> end
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> double.(1..10) |> Enum.to_list()
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10]

That went easier than the map function.

For the final example, I'm going to demonstrate the ability to halt the processing of the input stream by returning a tuple containing :halt, which is especially useful if the stream is infinite. I'll implement Stream.take/2 and then apply it to an infinite random number stream.

iex> random_number_stream = Stream.repeatedly(fn -> Enum.random(1..100) end)
iex> take = fn (enum, count) ->
...> Stream.transform(enum, 0, fn
...> (item, current_count) when current_count < count -> {[item], current_count + 1}
...> (_, current_count) -> {:halt, current_count}
...> end)
...> end
#Function<12.99386804/2 in :erl_eval.expr/5>
iex> take.(1..10, 5)
#Function<62.77875168/2 in Stream.transform/3>
iex> take.(1..10, 5) |> Enum.to_list()
[1, 2, 3, 4, 5]
iex> take.(random_number_stream, 10) |> Enum.to_list()
[21, 5, 29, 21, 94, 93, 17, 97, 63, 12]
iex> take.(random_number_stream, 10) |> Enum.to_list()
[62, 7, 39, 69, 64, 25, 98, 36, 100, 56]

The accumulator represents the current element count. Once the current element count reaches the target number, the transformation function returns a tuple with :halt as the first element, ending the stream. Very nice.

Stream.transform/4

The Stream.transform/4 function works like Stream.transform/3, except that it has an initialization function and a function that runs after the stream has completed. As the documentation states, it's a combination of Stream.transform/3 and Stream.resource/3. This allows us to create a transformation function that uses some sort of external resource.

Since I don't know enough about Elixir yet to use external resources, I'm just going to re-implement the map function.

iex> map = fn (enum, func) ->
...> Stream.transform(enum, fn -> nil end, fn(item, acc) -> {[func.(item)], acc} end, fn _ -> nil end)
...> end
#Function<12.99386804/2 in :erl_eval.expr/5>
iex> map.(1..10, &(&1 / 2)) |> Enum.to_list()
[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]

The second parameter is the initialization function, where you would acquire the resource and create the initial accumulator value, the third parameter is the function that creates the stream elements, and the fourth parameter is the cleanup function where a resource would be released.

Stream.unfold/2

The Stream.unfold/2 function is somewhat similar to Stream.scan/2 with two large differences. It does not transform an enumerable, but accepts only an initial accumulator value and a function to generate further values. All values in the stream it produces are generated off of that initial accumulator value. The Stream.unfold/2 function allows the stream to be terminated by returning a nil. If nil is never returned, the stream is infinite.

The stream generation function takes one parameter, the accumulator, and returns a tuple where the first element contains the value to be emitted and the second element contains the accumulator to be passed to the next call to the stream generation function.

In the following examples, I'm going to create streams that emit a series of numbers that double every time a value is emitted. The first stream will terminate after the values exceed 50 and the second stream will be infinite.

Here's the finite stream.

iex> stream = Stream.unfold(1, fn
...> number when number >= 50 -> nil
...> number -> {number * 2, number * 2}
...> end)
#Function<64.77875168/2 in Stream.unfold/2>
iex> stream |> Enum.take(2)
[2, 4]
iex> stream |> Enum.take(4)
[2, 4, 8, 16]
iex> stream |> Enum.take(10)
[2, 4, 8, 16, 32, 64]
iex> stream |> Enum.take(20)
[2, 4, 8, 16, 32, 64]

Here's the infinite stream.

iex> stream = Stream.unfold(1, fn number -> {number * 2, number * 2} end)
#Function<64.77875168/2 in Stream.unfold/2>
iex> stream |> Enum.take(2)
[2, 4]
iex> stream |> Enum.take(4)
[2, 4, 8, 16]
iex> stream |> Enum.take(10)
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]
iex> stream |> Enum.take(20)
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768,
 65536, 131072, 262144, 524288, 1048576]
iex> stream |> Enum.take(50)
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768,
 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, 16777216,
 33554432, 67108864, 134217728, 268435456, 536870912, 1073741824, 2147483648,
 4294967296, 8589934592, 17179869184, 34359738368, 68719476736, 137438953472,
 274877906944, 549755813888, 1099511627776, 2199023255552, 4398046511104,
 8796093022208, 17592186044416, 35184372088832, 70368744177664, 140737488355328,
 281474976710656, 562949953421312, 1125899906842624]

Stream.uniq/1

The Stream.uniq/1 function emits unique elements. So if an element has been emitted before, it will not be emitted again. This means that the function has to keep track of which elements have been emitted so far, and those elements will continue to use memory as long as the stream has not been garbage collected. This isn't a big deal for streams with a small range of unique elements, but the documentation warns that infinite streams can be dangerous since the number of emitted elements being stored can grow indefinitely and never be garbage collected.

iex> Stream.uniq(1..10) |> Enum.to_list()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
iex> Stream.uniq([1, 2, 3, -4, 3, 2, 10, -1, 2, 4, 8]) |> Enum.to_list()
[1, 2, 3, -4, 10, -1, 4, 8]
iex> Stream.repeatedly(fn -> Enum.random(1..20) end) |> Stream.uniq() |> Enum.take(5)
[9, 7, 17, 6, 11]
iex> Stream.repeatedly(fn -> Enum.random(1..20) end) |> Stream.uniq() |> Enum.take(5)
[20, 19, 4, 3, 7]
iex> Stream.repeatedly(fn -> Enum.random(1..20) end) |> Stream.uniq() |> Enum.take(10)
[20, 16, 13, 4, 7, 8, 2, 6, 17, 5]
iex> Stream.repeatedly(fn -> Enum.random(1..20) end) |> Stream.uniq() |> Enum.take(15)
[3, 4, 20, 18, 13, 15, 1, 11, 9, 5, 10, 17, 16, 6, 7]
iex> Stream.repeatedly(fn -> Enum.random(1..20) end) |> Stream.uniq() |> Enum.take(20)
[14, 3, 20, 2, 10, 16, 18, 19, 7, 1, 11, 17, 15, 6, 5, 9, 13, 12, 4, 8]
iex> Stream.repeatedly(fn -> Enum.random(1..20) end) |> Stream.uniq() |> Enum.take(40)

That last example froze IEx. The random number stream is infinite, but the call to Stream.uniq() is only allowing unique numbers to be emitted. I suspect what is happening is that Enum.take/1 is attempting to take 40 elements, but after 20 elements are emitted, all the possible unique elements have been emitted, so no more elements are being emitted. The stream hasn't ended because the random number stream is infinite, but the Stream.uniq/1 transformation simply won't allow any more elements to be emitted. Enum.take/1 doesn't know that there will never be any more, so it's patiently waiting for more elements to be emitted.

So you have to definitely be careful with Stream.uniq/1. Finite streams shouldn't cause much of a problem, but infinite streams most likely will.

Stream.uniq_by/2

The Stream.uniq_by/2 function works like Stream.uniq/1, except the uniqueness is determined by a parameter function. Instead of only emitting unique values from the input enumerable, the stream will emit unique values returned by the function.

Here is an example where even numbers have the same value and odd numbers have the same value.

iex> evenness = &(rem(&1, 2))
#Function<6.99386804/1 in :erl_eval.expr/5>
iex> Stream.uniq_by(1..100, evenness) |> Enum.to_list()
[1, 2]

The stream emits the first odd number and even number it comes across.

Here's another example where the unique value is the remainder after I divide by 10. I create an infinite stream of random numbers between 1 and 100, take the first N numbers, and then feed them to Stream.uniq_by/2. A maximum of ten numbers will be emitted.

iex> random_stream = Stream.repeatedly(fn -> Enum.random(1..100) end)
#Function<53.77875168/2 in Stream.repeatedly/1>
iex> random_stream |> Stream.take(2) |> Stream.uniq_by(&(rem(&1, 10))) |> Enum.to_list()
'KE'
iex> random_stream |> Stream.take(5) |> Stream.uniq_by(&(rem(&1, 10))) |> Enum.to_list()
[65, 78, 23, 49]
iex> random_stream |> Stream.take(5) |> Stream.uniq_by(&(rem(&1, 10))) |> Enum.to_list()
'I;`'
iex> random_stream |> Stream.take(10) |> Stream.uniq_by(&(rem(&1, 10))) |> Enum.to_list()
[76, 94, 32, 18, 55, 99, 63]
iex> random_stream |> Stream.take(10) |> Stream.uniq_by(&(rem(&1, 10))) |> Enum.to_list()
[38, 26, 67, 39, 62, 10, 83, 74]
iex> random_stream |> Stream.take(100) |> Stream.uniq_by(&(rem(&1, 10))) |> Enum.to_list()
[20, 71, 18, 94, 57, 12, 73, 19, 45, 86]

Stream.with_index/2

The Stream.with_index/2 function works just like Enum.with_index/2 except that it creates a stream instead of a list. It will create tuples for each element where the first value is the element from the input and the second value is the index. The second parameter is the offset where the indexes are numbered from, which defaults to 0.

iex> Stream.with_index(["a", "b", "c", "d"]) |> Enum.to_list()
[{"a", 0}, {"b", 1}, {"c", 2}, {"d", 3}]
iex> Stream.with_index(["a", "b", "c", "d"], 4) |> Enum.to_list()
[{"a", 4}, {"b", 5}, {"c", 6}, {"d", 7}]

Stream.zip/1

The Stream.zip/1 function works just like Enum.zip/1 except that it creates a stream instead of a list.

iex> Stream.zip([1..10, ["a", "b", "c", "d"], -8..-20]) |> Enum.to_list()
[{1, "a", -8}, {2, "b", -9}, {3, "c", -10}, {4, "d", -11}]
iex> Stream.zip([1..10, -8..-20]) |> Enum.to_list()
[
  {1, -8},
  {2, -9},
  {3, -10},
  {4, -11},
  {5, -12},
  {6, -13},
  {7, -14},
  {8, -15},
  {9, -16},
  {10, -17}
]

Stream.zip/2

The Stream.zip/2 function works just like Enum.zip/2 except that it creates a stream instead of a list.

iex> Stream.zip(1..10, ["a", "b", "c", "d"]) |> Enum.to_list()
[{1, "a"}, {2, "b"}, {3, "c"}, {4, "d"}]
iex> Stream.zip(1..10, -1..-5) |> Enum.to_list()
[{1, -1}, {2, -2}, {3, -3}, {4, -4}, {5, -5}]

That's it for the Stream module functions. Most of them were the stream equivalent of functions in the Enum module, but there were enough unique functions to keep things interesting.