Today I'm going to discuss one of my favorite topics: streams. I've been working with streams since my early days of programming working with I/O streams in C++, but it wasn't until I learned Bacon.js Javascript library, which takes a reactive functional approach to processing streams, did I gain a deeper understanding of streams and grew to love them. Thank you MPJ for introducing me to Bacon.js with your wonderful YouTube videos.
You might also be familiar with RxJS, a widely used Javascript library, which provides similar functionality to Bacon.js.
Elixir has a similar concept of streams as Bacon.js, and similar functionality to work with them. That makes me like Elixir even more.
What are Streams?
Some of you may be more familiar with the concept of streams than others, so I'm going to give my explanation of what a stream is.
You can think of a stream as a collection of data elements. However, instead of all the data being available at once, like in a normal in-memory collection such a list, the data in a stream arrives over a period of time (which may be anywhere from a couple microseconds to several minutes or even on the order of hours or days), and is processed as it arrives.
The data elements in a stream are generated one by one and reacted to rather than stored in memory and then iterated over. Stream data can be generated by an algorithm (like when you are enumerating a range) or it can be generated by external events, such as keyboard input, mouse input, system events, data being received over the network, and so forth.
Streams have an advantage in that they don't need to store their entire contents in memory. They can generate data items one at a time and then pass each item onward to whatever wants to handle or transform the data item. It doesn't have to keep that data around in memory, other than a holding buffer that the data is kept in until it can be consumed by something else.
The nature of streams keeps memory from filling up with a huge amount of data, since data items can be consumed as fast as they arrive. Streams can be reading data from a huge data store such as a file, represent an unknown amount of data coming in over the network, be generating numbers from a mathematical series, or even be infinite streams, such as a stream generating a never-ending series of random numbers. You could even have a time-based stream that emits a value once per second.
Whereas some external data being pushed at the stream such as network data and keyboard input arrive unpredictably and have to be stored temporarily until consumed, other types of data being pulled from the stream such as the next number in a mathematical series, random numbers, or the contents of the file can be retrieved/calculated on demand as needed. This can make for an efficient use of memory when there is a huge amount of data to process.
The stream contrasts with collection such as a list, which stores all its data in memory. Transformations of collections typically involve transforming the entire set of data at once, storing the result the entire transformed data set in memory.
A range in Elixir is a type of stream that generates a mathematical series. Like a list, a range is also enumerable, but it doesn't contain the entire range in memory: it generates each number in the range one at a time. For example, the range 1..1000
doesn't contain all numbers between 1 and 1000: it just contains the numbers at the low and high end of the range, 1 and 1000. As we iterate over the range, it generates one number at a time, just like any other stream would do. We can choose to store all the generated numbers in a collection or process them one at a time as they are generated, making more efficient use of memory.
Processing Streams
Streams are enumerable and enumerating over a stream is often called lazy enumeration because the next element in the stream is created as-needed rather than all the elements being created in advance (eager enumeration).
Whereas a pipeline of operations on a collection such as a list results in a different collection after each operation, a pipeline of operations on streams are done with one element at a time culminating with the single result. Every time an item is emitted from a stream, that item goes through the entire pipeline of operations until the final result is achieved. Then the next item is retrieved from the stream and the same thing repeats until the stream comes to an end or the code no longer wants to read from the stream.
This is in contrast to a list, where each operation is repeated on the entire list to get a new list, which is then passed to the next operation in the pipeline. When the list data reaches the end of the pipeline, all operations are completed.
You can think of a pipeline of operations on a stream as an assembly line where people are all working at different stages, processing each item as it comes to them and passing it onto the next person. A pipeline of operations on an in-memory collection like a list would be like an assembly line where each person works on all the items available before passing all the items to the next person. It takes a lot more space to store all those items while they are waiting to be passed on, and if there are too many items, this is an inefficient way of processing.
Let's take a collection of numbers: 1, 2, 3, 4, 5, which are stored in an input file. Let's say we have a pipeline of operations that squares each number, converts it to a string, and then writes it to an output file. I'm going to create a diagram showing how this would be done using in-memory collection processing (eager processing) vs stream processing (lazy processing).
Here's a representation of how the numbers would be processed using eager processing.
[Input File] -> (Read all the data from the file) -> [1, 2, 3, 4, 5] ->
(Square the numbers) -> [1, 4, 9, 16, 25] -> (Convert to strings) ->
["one", "two", "three", "four", "five"] ->
(Write all the data to the output file) -> [Output File]
This is the sort of transformation we've become used to so far in our Elixir code. It's called "eager" because it transforms all the data at once, doing the most work possible at each step.
Here's a representation of how the numbers would be processed using a stream and lazy processing.
[Input File] -> (Read one item) -> 1 -> (Square the number)
-> 1 -> (Convert to string) -> "one" -> (Write one item)
-> [Output File]
[Input File] -> (Read one item) -> 2 -> (Square the number)
-> 4 -> (Convert to string) -> "two" -> (Write one item)
-> [Output File]
[Input File] -> (Read one item) -> 3 -> (Square the number)
-> 9 -> (Convert to string) -> "three" -> (Write one item)
-> [Output File]
[Input File] -> (Read one item) -> 4 -> (Square the number)
-> 16 -> (Convert to string) -> "four" -> (Write one item)
-> [Output File]
[Input File] -> (Read one item) -> 5 -> (Square the number)
-> 25 -> (Convert to string) -> "five" -> (Write one item)
-> [Output File]
Only a subset of the data in the file is in memory at any one time, making stream processing more memory efficient. Whereas with eager processing the entire set of data goes through each transformation step, with lazy processing each data item goes through the entire transformation pipeline before the next item is processed. It's called "lazy" because each step does the minimum amount of work as possible before passing it onto the next step.
Note that although I show the items being read in one by one from the file, it's more efficient to read a larger chunk of data at once from the file and store it in a memory buffer until it is consumed. Streams will sometimes do something like this internally, using a little more memory for the sake of time efficiency. This sort of optimization can really help when you're reading a couple billion data items from a file one at a time, but the stream is actually retrieving them from the file in chunks of 1000 and then handing them out one at a time. I guarantee that the file streaming functionality in Elixir does something this because it would be horribly slow and inefficient otherwise.
When the amount of data is small like in this example, there isn't a big advantage to stream processing. However, when you're processing multiple gigabytes of data, stream processing becomes essential so that you aren't filling up memory with large amounts of data.
Streams can also be used to provide partial results before the entire processing of the stream contents has finished, which can provide a perception of increased responsiveness even if the entire operation isn't any faster.
Does this mean that stream-style lazy processing is always awesome and the eager processing of enumerables is dumb? No, of course not. In-memory eager processing tends to be more time efficient. It's the typical space vs time complexity tradeoff. Streams will often involve I/O or calculations, so there is often (although not always) a time cost in waiting for the next item in the stream to be produced.
In addition, sometimes your data is in memory anyway and gets transformed in memory, such as the state of an application. In that case, streaming won't be all that helpful. Unless the volume of in-memory data is significant, there's nothing wrong with transforming in-memory data structures using eager enumeration instead of streams. When responding to external events or dealing with a potentially large amount of data, on the other hand, then streams have the advantage.
Eager processing pipeline is certainly more efficent from a CPU viewpoint, since the CPU can perform the same operation over and over again on a consecutive sequence of data without context switching. Lazy processing is more memory efficient since it only holds a subset of the data in memory at a time, but that data is less likely be in contiguous areas of memory, hence less efficient CPU operations. That may be just fine. Like all engineering decisions, you have to weigh the trade-offs and choose what is right for the current situation.
Creating Streams
There are a number of functions in Elixir what will create streams for us (such as streams related to I/O), but there are several ways for us to create streams ourselves.
Stream.iterate/2
I can create an infinite stream using Stream.iterate/2
, where the first parameter is the starting value and the second parameter is a function that generates the next values. This is very much like the concept of generators in Javascript, C#, and Python.
iex> doubling_series = Stream.iterate(1, fn number -> number * 2 end)
#Function<64.77875168/2 in Stream.unfold/2>
iex> Enum.take(doubling_series, 10)
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
iex> Enum.take(doubling_series, 10)
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
The series is infinite, but we can use Enum.take/2
to grab a finite amount of numbers from the series. Interestingly, the stream doesn't seem to have a concept of state, so after it generates 10 numbers, it starts all over again when I ask it to generate 10 more numbers. I'm thinking that's because the stream is immutable like everything else in Elixir and to store the state we'd have to get a new instance of the stream that was updated to save the state. We could probably do it at a lower level by making use of whatever functionality makes the stream enumerable. An iterator can always get the next number.
Stream.repeatedly/1
The Stream.repeatedly/1
function can be used to create an infinite stream as well. Unlike Stream.iterate/2
, the previously-emitted value is not available. The function you pass to it has to generate a value without any input.
In this example, I create a stream of random numbers from 1 to 100.
iex> random_number = fn -> Enum.random(1..100) end
#Function<20.99386804/0 in :erl_eval.expr/5>
iex> random_number.()
6
iex> random_number.()
54
iex> random_number.()
87
iex> random_number_stream = Stream.repeatedly(random_number)
#Function<53.77875168/2 in Stream.repeatedly/1>
iex> Enum.take(random_number_stream, 10)
[97, 47, 50, 15, 66, 56, 61, 21, 76, 25]
iex> Enum.take(random_number_stream, 10)
[6, 19, 69, 39, 82, 64, 10, 76, 91, 52]
iex> Enum.take(random_number_stream, 10)
[20, 10, 77, 98, 71, 39, 84, 12, 37, 27]
Every time I request 10 numbers from this stream, I get 10 random numbers.
Stream.resource/3
The Stream.resource/3
function is the most flexible way to create a stream. You specify a function that initializes the stream, a function that produces stream values, and a function that cleans up after the stream has completed.
The result of the initialization function is passed to the stream value function. The stream value function gets called until the stream stops. Every time the stream value function is called, it returns a tuple where the first item is the data to be emitted (enclosed in a list) and the second item is the data to be passed to the next iteration of the function. When the stream ends, the stream value function returns {:halt, item}
, where item
is the thing that will be passed to the cleanup function.
Let's try an example. The initialization function defines a range (1 to 10) and a stop number (3). The stream will keep producing random numbers within the range until the stop number has been generated. When the stop number is generated, the stream will end. To create the stream, you need to call StopNumberStream.create/2
, passing it the range and the stop number.
defmodule StopNumberStream do
def create(range, stop_number) do
Stream.resource(fn -> initialize(range, stop_number) end,
&generate_next_value/1, &done/1)
end
defp initialize(range, stop_number) do
{range, stop_number, true}
end
defp generate_next_value({range, stop_number, true}) do
case generated_number = Enum.random(range) do
^stop_number -> {[stop_number], {range, stop_number, false}}
_ -> {[generated_number], {range, stop_number, true}}
end
end
defp generate_next_value({_, stop_number, false}) do
{:halt, stop_number}
end
defp done(_) do
nil
end
end
All the initialize/1
function does is return a tuple indicating the range, the stop number, and a true
, which indicates that the stream is to continue to generate numbers.
The generate_next_value/1
receives the same tuple that the initialize/1
function created. It starts off with the first function clause which generates a random number. If that random number is the stop number, it returns a tuple with the generated number and a false
as the last value, indicating that number generation should stop. If the random number is any other number, the function clause returns the tuple with the generated number and true
, indicating that the number generation should continue. Every time this function returns a tuple, the first item in the tuple will be the item that is emitted from the stream.
The next generate_next_value/1
clause is called when the stream has decided to stop generating numbers. It returns a tuple that begins with :halt
, which is the only value that cannot be emitted from a stream. Instead, the stream comes to an end (not all streams are infinite!), and the tuple is passed to the cleanup function.
The done/1
function is called for cleanup. This is where someone could close a file or network connection, but since I have nothing to do for cleanup, I just implemented a function that evaluates to nil.
You can find this code in the the "lwm 43 - Streams" folder in the code examples in the Learn With Me: Elixir repository on Github.
Let's try it out in IEx.
iex> number_stream = StopNumberStream.create(1..10, 3)
#Function<54.77875168/2 in Stream.resource/3>
iex> Enum.take(number_stream, 100)
[8, 5, 6, 9, 5, 6, 2, 6, 6, 7, 4, 5, 7, 4, 8, 9, 5, 9, 1, 9, 5, 5, 7, 8, 10, 10,
7, 5, 1, 2, 1, 6, 5, 2, 6, 4, 7, 5, 7, 1, 10, 2, 7, 10, 3]
iex> Enum.take(number_stream, 100)
[8, 6, 2, 4, 9, 7, 4, 7, 1, 1, 9, 5, 6, 10, 10, 8, 2, 9, 5, 7, 10, 10, 9, 5, 8,
10, 7, 4, 3]
iex> Enum.take(number_stream, 100)
[2, 4, 8, 2, 2, 4, 3]
iex> number_stream = StopNumberStream.create(1..7, 6)
#Function<54.77875168/2 in Stream.resource/3>
iex> Enum.take(number_stream, 100)
[7, 3, 5, 6]
iex> Enum.take(number_stream, 100)
[2, 3, 6]
iex> Enum.take(number_stream, 100)
[6]
I first create a stream that generates a random number between 1 and 10 and stops whenever the number 3 is emitted. The number of generated numbers varies before it finally generates a 3 and stops. Then I create a stream that generates numbers between 1 and 7 and stops when it emits a 6. In the final run, it happens to emit 6 at the very beginning and stops right away.
As the name indicates Stream.resource/3
is well suited for stream that make use of a resource such as a file or a network connection, since it provides the ability to initialize and clean up the resource.
More Ways to Create a Stream
There are a couple more ways that I know of to create a stream. I'm not going to go into any more examples here, but I probably will in a future post where I delve into each function in the Stream
module.
Stream.cycle/1
can create a never-ending stream from a list, where it emits the next element in a list and then wraps around to the first element. This is useful for some kind of round-robin stream.
Stream.unfold/2
can also create a stream with an accumulator and a function that uses the accumulator. The accumulator stores the state of the stream. It's a lot like a reduce function except that each step in the reduction is emitted from the stream. The Stream.unfold/2
function would actually have been a better choice for the StopNumberStream example above because there isn't any external resource involved, but I didn't find out about it until after I had written that code.
How Do We Use a Stream?
Since streams are enumerable, we can use any function from the Enum
module. However, all the functions in the Enum
module are eager, meaning that they will suck in the entire contents of the stream and output the result, which is usually a list. While this is occasionally useful and desireable to do (like when reading the entire contents of a file into memory all at once), it does completely remove the advantage of a stream, and you won't be able to take advantage of the lazy processing flow I demonstrated above.
Using the eager processing of the Enum
module can lead to trouble when the stream contains large amounts of data or infinite data. Calling any Enum
function on those kinds of streams can cause memory to get filled up and a program that appears to freeze, as the poor Enum
function attempts to iterate over a never-ending stream of data, vainly attempting to find the end to it all. Taking 10 items from a never-ending stream with Enum.take/2
would be just fine, since it would stop reading from the stream after reading 10 items, but attempting to map a never-ending or extremely large stream guarantees failure.
For small streams that have a definite end to them, you could get away with just using eager processing and just the Enum
functions. For larger streams, infinite streams, or ones that have an indefinite end, you want lazy processing for sure. It's much better to have each element in a stream be processed individually so that memory does not fill up. To do lazy processing and set up a stream pipeline, you need to use the functions in the Stream
module.
The Stream module has equivalents to many of the functions in the Enum
module, such as Stream.map/2
, which will produce a transformed stream. The transformed stream will transform individual data elements one at a time, passing each one onto the next operation instead of operating on all the elements at once, like Enum.map/2
would do.
Unlike the Enum
functions, most of the Stream
functions do not execute immediately. Instead, they return another stream which represents the stream after the transformation. These functions just set up the pipeline, but they do not start processing anything. Processing will start when certain eager functions are called on the stream, which would be any of the functions in the Enum
module such as Enum.to_list/1
. The Stream.run/0
function will also accomplish the same thing if you aren't interested in the final result of the stream, but are instead just interested in the side effects within the stream, like writing to another stream or displaying something on the screen.
Like with Bacon.js, it's possible to set up a pipeline of stream transformations and have data get pushed through the pipeline as it arrives. That's very useful for processing large amounts of data one item at a time or reacting to events like keyboard input or mouse clicks. This is the magic of streams: we can create a complex pipeline of data transformations that can process a huge amount of data while having only a small part of that data in memory at any given time.
The ability to chain operations together to create a single stream means that streams are composable. You can compose a variety of smaller stream operations together to create a more powerful stream that does what you want.
Similar Concepts in Javascript and C#
C# has some similarity to Elixir streams in the form of an IQueryable<T>
. An IQueryable<T>
represents an operation that has not yet been run. You can continue chaining operations to the IQueryable
without actually running anything, much in the same way a stream pipeline is created. It's not until you start enumerating or call a method like ToList()
does the IQueryable
actually start running and spit out the result as an IEnumerable or some other object. This is called lazy evaluation in the sense that it just sets up a pipeline and nothing happens until later. However, although IQueryable<T>
is lazy, it isn't a stream. The resulting operations will return a single in-memory result, much like a function in the Elixir Enum
module would.
C# has the concept of streams as well, but you can't form a lazy processing pipeline with them. You instead have to call functions on each stream to read the stream's contents or write to a stream. It takes more work and coding to set up stream processing than it does in Elixir.
Javascript of course has Bacon.js and RxJS, which I've already mentioned. Most of the functions in these libraries just set up the processing pipeline, and processing doesn't actually happen until a trigger function is called to start processing the stream.
A Practical Example
I'm going get some practice using streams and create a function that reads in a file using a stream, transform it using a stream transformation pipeline, and then write it to an output file. The code will read the file line by line, transform those lines into a stream of two line chunks, merge those two lines together onto one line to form a stream of merged lines, add a newline character to form a stream of lines with newline characters at the end, and then stream that into an output file. So I'll get an output file with half the number of lines as the input file.
It took a little bit of reading of the documentation for creating file streams and the documentation for the Stream
module, but once I found what I needed in the documentation, the code came together surprisingly quickly. There were some bugs of course (I didn't realize at first that the newline characters were still attached after reading the line from the file), but a few iterations solved those.
Here's what I came up with. All of the files can be found in the the "lwm 43 - Streams" folder in the code examples in the Learn With Me: Elixir repository on Github.
This is the input file I used, which was imaginatively named "InputFile.txt"
Line 1
Line 2
Line 3
Line 4
Line 5
Line 6
Line 7
Line 8
Line 9
Line 10
Line 11
Line 12
Line 13
Line 14
Line 15
This is the code, contained in "file_transformer.exs".
defmodule FileTransformer do
#Transforms the text in the input file and writes it to the output file
def transform(input_file_name, output_file_name) do
input_stream = File.stream!(input_file_name, [:utf8, :read_ahead], :line)
output_stream = File.stream!(output_file_name, [:utf8, :delayed_write], :line)
transform_stream(input_stream, output_stream)
end
#Transforms the text in the input stream and writes it into the output stream
defp transform_stream(input_stream, output_stream) do
input_stream
|> Stream.chunk_every(2)
|> join_lines()
|> add_newline()
|> Stream.into(output_stream)
|> Stream.run()
end
#Joins the lines from a stream of multi-line chunks into a single line with a space separator
defp join_lines(stream) do
stream |> trim_chunk() |> merge_chunk()
end
#Removes whitespace characters at the end of each line in the chunk
defp trim_chunk(stream) do
Stream.map(stream, fn chunk -> Enum.map(chunk, &String.trim_trailing/1) end)
end
#Merges each line in the chunk into a single line with a space separator
defp merge_chunk(stream) do
Stream.map(stream, fn chunk -> Enum.join(chunk, " ") end)
end
#Adds a newline character to the end of some text
defp add_newline(stream) do
Stream.map(stream, fn text -> text <> "\n" end)
end
end
Rather than attempting to put all the code into one large function, I split it up into multiple smaller functions that make this code a lot easier to read and understand. I'm going to go over the code and explain to help you understand it.
The only public method in the module is transform/2
, which accepts the input file name and the output file name. It creates input and output streams based on the file names using the File.stream!/3
function. It passes the :utf8
option to indicate that the file is to be interpreted as UTF-8. It also passes the :read_ahead
option for input stream and :delayed_write
for the output stream to take advantage of buffered reads and writes, which reads and writes multiple lines at a time rather than individually. This improves performance. The file :line
option indicates that text is to be read one line at a time rather than one character at a time.
Once the streams are created, they are passed to the transform_stream/2
function, which sets up the processing pipeline using the pipe |>
operator. As you will hopefully recall from earlier posts, this passes the result of the previous function call as the first parameter to the next function call.
The pipeline starts with the data to be transformed, which is passed to Stream.chunk_every/2
to transform a stream of lines into a stream containing chunks of 2 lines. This produces a transformed stream. That transformed stream will not emit anything until it has received two lines from the input stream or the input stream has come to an end. The transformed stream is passed to join_lines/1
, which joins the lines together.
The join_lines/1
function in turn has its own transformation pipeline. It passes the stream to trim_chunk/1
, which trims all whitespace characters from the lines in the chunk. This is necessary to remove the newline characters. The transformed stream is then passed on to merge_chunk/1
, which joins the two lines into a single line.
At this point, we have a stream that emits merged lines, but there are no longer any newline characters because we stripped them off. We now have to set up another transformation add a newline character, which gives us our final stream. This stream emits merged lines that have a newline character, which is exactly what we want to write to the output file.
We write the stream into the output file by calling Stream.into
, which writes the contents of the stream into our output file stream. Finally we call the Stream.run/0
function to start processing the stream. The lines are processed one by one and the results are written to the output file one by one.
Note that stream transformations can merge or split the contents of their input streams, so multiple items emitted from an earlier stream can be emitted as a single item in a later stream, or a single item in an earlier stream can be emitted as multiple items from a later stream. If multiple items are being merged, the stream won't emit anything until it has received enough items from its input stream to do so.
Here's a text diagram showing what is happening here.
InputFile Stream -> "Line 1\n" ... "Line 2\n" (Two items being emitted over time)
-> Chunked Stream -> ["Line 1\n", "Line 2\n"] (A item is emitted)
-> Trimmed Stream -> ["Line 1", "Line 2"]
-> Merged Stream -> "Line 1 Line 2"
-> Merged with Newline Stream -> "Line 1 Line 2\n"
-> Output File Stream.
This is what the output looks like in IEx.
iex> c "examples/lwm 43/file_transformer.exs"
[FileTransformer]
iex> FileTransformer.transform("examples/lwm 43/InputFile.txt", "examples/lwm 43/OutputFile.txt")
:ok
Not much is happening in IEx output when I run the transform function. The results can be seen in the output file.
The output file looks like this.
Line 1 Line 2
Line 3 Line 4
Line 5 Line 6
Line 7 Line 8
Line 9 Line 10
Line 11 Line 12
Line 13 Line 14
Line 15
Visualizing the File Transformer
Now I'm going to take what I did in the previous practical example and add a bunch of print statements so that we can more clearly see what is going on. This will help you to get a visualization of what is going on during stream processing by following the output as the stream is processed.
This code can be found in the "file_transformer_with_messages.exs" file the "lwm 43 - Streams" folder in the code examples in the Learn With Me: Elixir repository on Github.
defmodule FileTransformerWithMessages do
#Transforms the text in the input file and writes it to the output file
def transform(input_file_name, output_file_name) do
IO.puts "Creating the input stream"
input_stream = File.stream!(input_file_name, [:utf8, :read_ahead], :line)
IO.puts "Creating the output stream"
output_stream = File.stream!(output_file_name, [:utf8, :delayed_write], :line)
transform_stream(input_stream, output_stream)
end
#Transforms the text in the input stream and writes it into the output stream
defp transform_stream(input_stream, output_stream) do
IO.puts "Transforming the stream"
IO.puts "-------------------------------"
input_stream
|> print_emitted_lines("input file stream")
|> Stream.chunk_every(2)
|> print_emitted_chunks("chunked line stream")
|> join_lines()
|> add_newline()
|> print_emitted_lines("merged line stream with newlines")
|> Stream.each(fn line -> IO.puts ~s(Writing "#{replace_newline(line)}" to output stream) end)
|> Stream.into(output_stream)
|> Stream.each(fn _ -> IO.puts "-------------------------------" end)
|> Stream.run()
IO.puts "Transform Finished"
end
#Joins the lines from a stream of multi-line chunks into a single line with a space separator
defp join_lines(stream) do
stream
|> trim_chunk()
|> print_emitted_chunks("trimmed chunk stream")
|> merge_chunk()
|> print_emitted_lines("merged line stream")
end
#Removes whitespace characters at the end of each line in the chunk
defp trim_chunk(stream) do
Stream.map(stream, fn chunk -> Enum.map(chunk, &String.trim_trailing/1) end)
end
#Merges each line in the chunk into a single line with a space separator
defp merge_chunk(stream) do
Stream.map(stream, fn chunk -> Enum.join(chunk, " ") end)
end
#Adds a newline character to the end of some text
defp add_newline(stream) do
Stream.map(stream, fn text -> text <> "\n" end)
end
#Replaces the newline character with "\n" so that we can show the newline in the string
#without actually printing a newline
defp replace_newline(string) do
String.replace(string, "\n", "\\n")
end
#Adds double quote characters to the front and back of a string
defp add_quotes(string) do
~s(") <> string <> ~s(")
end
#Outputs the lines that are emitted from the stream
defp print_emitted_lines(stream, stream_name) do
Stream.each(stream, fn line -> IO.puts ~s(Emitting "#{replace_newline(line)}" from #{stream_name}) end)
end
#Outputs the chunks that are emitted from the stream
defp print_emitted_chunks(stream, stream_name) do
Stream.each(stream, fn chunk ->
trimmed_chunk = Enum.map(chunk, &replace_newline/1)
quoted_chunk = Enum.map(trimmed_chunk, &add_quotes/1)
IO.puts ~s(Emitting a chunk, [#{Enum.join(quoted_chunk, ", ")}], from #{stream_name})
end)
end
end
It's the same code from before, but with additional code to print out what is being processed as the data moves through the pipeline of transformations. I use a lot of calls to Stream.each/2
to print out the data as it is passing through the stream. Like Enum.each/2
, Stream.each/2
is used solely for side effects. Unlike in Bacon.js, the side effects function will return a stream of untransformed data elements. The only processing that is done at this step in the pipleline is the printing of the data to the screen, so we are able to spy on the data in the stream without altering the outcome. Take that, Heisenberg!.
For some reason, I felt like omitting the parentheses when calling IO.puts/1
. I'm not sure why, but it felt easier to read that way because it makes the message stand out more. I suspect it's because I've been exposed to Elixir code that called IO.puts/1
in the same manner.
Here's what the output looks like when calling the function in IEx.
iex> c "examples/lwm 43/file_transformer_with_messages.exs"
[FileTransformerWithMessages]
iex> FileTransformerWithMessages.transform("examples/lwm 43/InputFile.txt", "examples/lwm 43/OutputFile.txt")
Creating the input stream
Creating the output stream
Transforming the stream
-------------------------------
Emitting "Line 1\n" from input file stream
Emitting "Line 2\n" from input file stream
Emitting a chunk, ["Line 1\n", "Line 2\n"], from chunked line stream
Emitting a chunk, ["Line 1", "Line 2"], from trimmed chunk stream
Emitting "Line 1 Line 2" from merged line stream
Emitting "Line 1 Line 2\n" from merged line stream with newlines
Writing "Line 1 Line 2\n" to output stream
-------------------------------
Emitting "Line 3\n" from input file stream
Emitting "Line 4\n" from input file stream
Emitting a chunk, ["Line 3\n", "Line 4\n"], from chunked line stream
Emitting a chunk, ["Line 3", "Line 4"], from trimmed chunk stream
Emitting "Line 3 Line 4" from merged line stream
Emitting "Line 3 Line 4\n" from merged line stream with newlines
Writing "Line 3 Line 4\n" to output stream
-------------------------------
Emitting "Line 5\n" from input file stream
Emitting "Line 6\n" from input file stream
Emitting a chunk, ["Line 5\n", "Line 6\n"], from chunked line stream
Emitting a chunk, ["Line 5", "Line 6"], from trimmed chunk stream
Emitting "Line 5 Line 6" from merged line stream
Emitting "Line 5 Line 6\n" from merged line stream with newlines
Writing "Line 5 Line 6\n" to output stream
-------------------------------
Emitting "Line 7\n" from input file stream
Emitting "Line 8\n" from input file stream
Emitting a chunk, ["Line 7\n", "Line 8\n"], from chunked line stream
Emitting a chunk, ["Line 7", "Line 8"], from trimmed chunk stream
Emitting "Line 7 Line 8" from merged line stream
Emitting "Line 7 Line 8\n" from merged line stream with newlines
Writing "Line 7 Line 8\n" to output stream
-------------------------------
Emitting "Line 9\n" from input file stream
Emitting "Line 10\n" from input file stream
Emitting a chunk, ["Line 9\n", "Line 10\n"], from chunked line stream
Emitting a chunk, ["Line 9", "Line 10"], from trimmed chunk stream
Emitting "Line 9 Line 10" from merged line stream
Emitting "Line 9 Line 10\n" from merged line stream with newlines
Writing "Line 9 Line 10\n" to output stream
-------------------------------
Emitting "Line 11\n" from input file stream
Emitting "Line 12\n" from input file stream
Emitting a chunk, ["Line 11\n", "Line 12\n"], from chunked line stream
Emitting a chunk, ["Line 11", "Line 12"], from trimmed chunk stream
Emitting "Line 11 Line 12" from merged line stream
Emitting "Line 11 Line 12\n" from merged line stream with newlines
Writing "Line 11 Line 12\n" to output stream
-------------------------------
Emitting "Line 13\n" from input file stream
Emitting "Line 14\n" from input file stream
Emitting a chunk, ["Line 13\n", "Line 14\n"], from chunked line stream
Emitting a chunk, ["Line 13", "Line 14"], from trimmed chunk stream
Emitting "Line 13 Line 14" from merged line stream
Emitting "Line 13 Line 14\n" from merged line stream with newlines
Writing "Line 13 Line 14\n" to output stream
-------------------------------
Emitting "Line 15" from input file stream
Emitting a chunk, ["Line 15"], from chunked line stream
Emitting a chunk, ["Line 15"], from trimmed chunk stream
Emitting "Line 15" from merged line stream
Emitting "Line 15\n" from merged line stream with newlines
Writing "Line 15\n" to output stream
-------------------------------
Transform Finished
:ok
That very nicely visualizes what is happening during stream processing and how lazy processing differs from eager processing.
Visualizing Stream Functions
If you want to see a great visualization of stream functions, check out this website for visualizing Bacon.js functions. It really helps you to understand how stream operations work, using timelines showing when data arrive and when the results are emitted. Many of the operations in Bacon.js have an equivalent in Elixir, so these visualizations are often applicable to Elixir stream functions as well. There is a bit of work involved in figuring out which Bacon.js functions correspond to which Elixir Stream
module functions, but hopefully it won't be too hard to figure that out. I should create a similar visualization for Elixir stream functions. I'll put that on my to-do list for a possible future project.
Concluding Streams
I love streams! They are such a nice model for processing data and I loved using Bacon.js in Javascript. I really liked creating the stream transformation pipeline and watching it all work nicely. I'm really happy that Elixir has such a similar model for its streams and so far I've enjoyed working working with streams in Elixir.
For the typical data transformation that I do in much of my code, I still plan to use the transformation functions in the Enum
module because of the small amount of data that's involved, but I plan to make good use of streams when handling large amounts of data or streams of data that arrive over time and have an indefinite end. That assumes of course the Elixir allows streams to be created for things like keyboard input or network data. I have yet to learn how to read data from the network or keyboard.