In the last section, I created the building blocks for chunking integers and creating sorted chunk files. Today, I'll discuss how I composed them to actually do something.

Naming the Intermediate Files

I knew I was going to have to pass a callback function as a parameter to IntermediateFile.intermediate_file_stream/4 that would take in a gen number and file number and return a file name, so I created a module called IntSort in "lib/int_sort.ex" and created the gen_file_name/2 function.

@spec gen_file_name(non_neg_integer(), non_neg_integer()) :: String.t()
def gen_file_name(gen, num) do
  "gen#{gen}-#{num}.txt"
end

The unit tests for this function were trivial, since it is so simple.

Creating the Chunk Files

Then it was time for the hard work of composing the building blocks, which I did in IntSort.create_chunk_files/4.

@spec create_chunk_files(String.t(), String.t(), pos_integer(), non_neg_integer())
def create_chunk_files(input_file, output_dir, chunk_size, gen) do
  # Create a stream pipeline that reads in integers from the input stream,
  # chunks them, sorts them, and then writes the chunks to files
  @integer_file.integer_file_stream(input_file)
  |> @integer_file.read_stream()
  |> Chunk.create_chunks(chunk_size)
  |> Chunk.sort_chunks()
  |> Chunk.write_chunks_to_separate_streams(gen, fn gen, chunk_num ->
    IntermediateFile.intermediate_file_stream(gen, chunk_num, &gen_file_name/2, output_dir)
  end)
  |> Stream.with_index(1)
  |> Stream.map(fn {_, chunk_num} -> gen_file_name(gen, chunk_num) end)
end

I set up a stream pipeline and return the stream.

  • First, a stream is created for the integer file, which is piped to a function that creates chunks.
  • The chunk stream is piped to a function that sorts them, and the sorted chunk stream is piped to a function that writes the chunks to file streams, which are created using IntermediateFile.intermediate_file_stream/4.
  • Chunk.write_chunks_to_separate_streams/2, which writes the chunks to file streams, returns a stream of tuples containing the chunk and the stream it was written to.
  • The tuple stream is transformed to a stream containing the tuples and their corresponding index
  • The tuple index stream is then transformed into a stream of file names that correspond to the chunks

So at the end of the method, we have a stream that emits the chunk file names of the chunk files that were created. Note at this point, as is typical with streams, a long stream pipeline with a lot of transformations has been set up, but nothing has happened yet. The stream still has yet to be run.

Running It All From the Command Line

At this point, I was able to set up a CLI interface to the application. It's all really similar to the CLI interface for IntGen, which I covered already, so I'm only going to be discussing the parts that are significantly different.

The CLI interface code is located in the "lib/cli" directory, and the interesting stuff happens in the IntSort.CLI module in "lib/cli/cli.ex".

The function to create the chunk files is quite simple. Almost all the non-UI code is located in other modules, as it should be. The code in this module should be oriented around the command line interface.

@spec create_chunks(Options.t()) :: Enum.t()
defp create_chunks(options) do
  IntSort.create_chunk_files(
      options.input_file,
      Path.dirname(options.output_file),
      options.chunk_size, 
      @chunk_gen
  )
  |> Stream.each(fn file_name -> IO.puts "Generated #{file_name}" end)
end

The chunk file creation stream is created using a call to IntSort.create_chunk_files/4 and then is piped to a call to Stream.each/2, which outputs the chunk file name every time a chunk file is created. The name is printed in the console every time a chunk file is created and not at the end, which is what stream processing allows. This is meant to be temporary in nature and provides some feedback as chunk files are created. I'll soon replace this with a progress bar, which will be more suitable when creating large numbers of chunk files.

At the end, the stream is returned.

Let's look at the process/1 clause that runs when CLI option parsing and validation is successful.

defp process({:ok, options}) do
  # Create the chunk files
  create_chunks(options) |> Stream.run()
end

Currently, this function is really simple. It calls another function to create the chunk files. In the future, I'm going to add another call to merge the chunks into a single output file.

The chunk stream is piped to Stream.run/1, which starts the stream processing. This is when the chunk file creation process is triggered. Everything up to this point was just setting up the stream pipeline in preparation for running it.

In the future, I'm going to extend the stream pipeline when merging the chunk files. My first thought was to run the stream to completion at this point before starting the merging because we needed all the chunk files to be created before merging. Then I realized I could achieve the same thing using Stream.chunk_every/2, which I can use to perform merges only after the necessary number of intermediate files have been created for merging. It would only continue to the merging stages of the pipeline after a group of N intermediate files had been created.

Now I can run this application from the command line. First, I'll generate 100 random integers in a data directory.

> ./int_gen --count 100 --lower-bound -100 --upper-bound 100 ../data/random_integers.txt
|===================================================================| 100% (100)

Then I'll take the random integer file that IntGen generated and feed it to IntSort.

> ./int_sort --input-file "../data/random_integers.txt" --chunk-size 10 --keep-intermediate ../output/sorted_integers.txt
Generated gen1-1.txt
Generated gen1-2.txt
Generated gen1-3.txt
Generated gen1-4.txt
Generated gen1-5.txt
Generated gen1-6.txt
Generated gen1-7.txt
Generated gen1-8.txt
Generated gen1-9.txt
Generated gen1-10.txt

That generated 10 intermediate files in the output directory. That's what I would expect for a 100 integers being sorted in chunks of 10. Although I specified the file sorted file as "sorted_integers.txt", that was never created because I haven't written the code for that yet. At the moment, only the initial sorted chunk intermediate files are created.

Let's take a look at some intermediate files and compare them to the unsorted random integers.

Here are the first 10 lines of random integers.

> head -n 10 ../data/random_integers.txt
42
-92
67
-6
40
28
-36
-16
-34
78

Here are the contents of "gen1-1.txt", where the first 10 random integers were chunked and sorted.

> cat ../output/gen1-1.txt
-92
-36
-34
-16
-6
28
40
42
67
78

It looks like the first 10 lines were chunked and sorted as expected.

Let's do the same thing for the last 10 random integers.

> tail -10 ../data/random_integers.txt
-4
26
-75
2
76
-10
77
-4
-57
81

Here are the contents of "gen1-10.txt" where the last 10 integers were chunked and sorted.

> cat ../output/gen1-10.txt
-75
-57
-10
-4
-4
2
26
76
77
81

It looks like the last 10 lines were also chunked and sorted as expected.

Now let's try generating chunk files using a chunk size that isn't evenly divisible by 100 and see what happens.

> ./int_sort --input-file "../data/random_integers.txt" --chunk-size 23 --keep_intermediate ../output/sorted_integers.txt
Generated gen1-1.txt
Generated gen1-2.txt
Generated gen1-3.txt
Generated gen1-4.txt
Generated gen1-5.txt

The first chunk contains the expected number of integers.

> cat ../output/gen1-1.txt
-99
-98
-92
-66
-48
-47
-36
-34
-16
-10
-6
14
24
27
28
40
42
42
43
53
67
78
95

The last chunk won't be at full size, because there aren't enough integers remaining to fill it, but it does contain the last 8 integers.

> cat ../output/gen1-5.txt
-75
-57
-10
-4
2
76
77
81

Updating the CLI With a Progress Bar

Now that I can see that chunking is working as expected, I'm going to replace the listing of chunk file names with a progress bar.

In order to do that, I'm going to have to number the chunks as they are processed and then insert a side effect (progress bar updating) into the stream using Stream.each/1. So I modified the create_chunks/2 function in "lib/cli/cli.ex" to do this.

# Calculate the progress update frequency for chunk creation
update_frequency = progress_update_frequency(num_chunks, @progress_updates)

# Create the chunk files
IntSort.create_chunk_files(
  options.input_file,
  Path.dirname(options.output_file),
  options.chunk_size,
  @chunk_gen
)
# Number each chunk
|> Stream.with_index(1)
# Update the progress bar after each chunk has been processed
|> Stream.each(fn {_, chunk_num} ->
  update_progress_bar(chunk_num, num_chunks, update_frequency)
end)
# Transform the stream back into chunk file names
|> Stream.map(fn {chunk_file, _} -> chunk_file end)

At the end of the stream, it produces a stream of chunk file names. I'll be able to use those in the future to merge the sorted chunk files.

After each chunk is processed, I call update_progress_bar/3 to update the progress bar with the current progress. Let's examine how that works.

The progress bar does not update every single time a chunk is produced. If the chunks are small enough and the number of integers being chunked is large enough, this could result in a very large number of chunk files being created. If the progress bar is updated too frequenty, the program will spend far more time doing progress bar updates than it will doing anything else. I/O is expensive, so we don't want to do it too often. The IntGen utility also had the same problem, but I came up with a bit more elegant solution for IntSort.

To solve this problem, I created something called the update frequency, which is the number of chunk files that must be created before the progress bar is updated. In the IntGen program, I updated the progress bar for every 1000 items. In IntSort, since the number of chunks being created can be highly variable, I wanted an update frequency that would work with any number of items. So I calculate the update frequency based on the number of chunks being created and the number of times I want the progress bar to be updated over the course of creating the chunks. So I created a function that calculates this, progress_update_frequency/2.

# Calculates the progress update frequency (the number of items 
# that pass between updates) based on the total number of items 
# and the number of updates that are to be made to the progress bar
defp progress_update_frequency(total_count, num_updates) do
  ceil(total_count / num_updates)
end

Then in the create_chunks/2 code I showed earlier, I compute the update frequency based on the number of expected chunks (I can calculate this in advance after I've counted the integers in the input file) and the @progress_updates, which represents the number of times the progress bar should be updated over the entire chunking process. That's currently set to 1000, so the progress bar will update (up to a max of) 1000 times, no matter whether a few chunks are being created or a million chunks are being created.

# Calculate the progress update frequency for chunk creation
update_frequency = progress_update_frequency(num_chunks, @progress_updates)

If the number of chunks are less than 1000, then the progress bar will be updated after every chunk is progressed, which will result in fewer than 1000 updates. If the number of chunks is between 1001 and 2000, then the progress bar will be updated after every two chunks, and so forth. Processing 1500 chunks, for example, would result in the progress bar being updated 750 times, since it's only being updated with a frequency of 2. So the 1000 update constant acts as a max value with the actual value dependent on the number of items being processed. It works out well by updating the progress bar enough to keep it reasonably up-to-date, but prevents the code from spending too much time on I/O.

Here's the code for update_progress_bar/3, which does the work of updating the progress bar. It receives the update frequency as a parameter because calculating that update frequency is the job of another function. It focuses on its job, which is updating progress bars.

# Updates the current progress bar
# This clause updates the progress bar occasionally when a larger 
# number of items are being processed so that the program doesn't 
# spend all its time on progress bar updates
defp update_progress_bar(current_count, total_count, update_frequency)
     when rem(current_count, update_frequency) == 0 do
  ProgressBar.render(current_count, total_count, progress_bar_format())
end

# Updates the progress bar when all the items have finished being processed.
# Otherwise, it won't show at 100% unless the total happens to be evenly
# divisible by the update frequency
defp update_progress_bar(current_count, total_count, _)
     when current_count == total_count do
  ProgressBar.render(current_count, total_count, progress_bar_format())
end

# If the current item count does not match the update frequency, 
# don't update the progress bar
defp update_progress_bar(_, _, _), do: :ok

There are three clauses here.

  • The first clause updates the progress bar when the current item count matches the update frequency (it is evenly divisible by the update frequency).
  • The second clause updates the progress bar when the final item is processed. This was necessary because the final item frequently did not match the update frequency and the progress bar would never reach 100%. So I had to ensure that the progress bar was updated to 100% after the last item was processed.
  • The final clause is called when the current item count does not match the update frequency and it is not the last item. The function does nothing in this case.

That worked out quite nicely. The progress bar updates at a reasonable rate (which can be tweaked by changing the @progress_update constant) and we get a nice set of sorted chunk files.

Here's what the output looks like so far:

> ./int_sort --input-file "../data/random_integers.txt" --chunk-size 10000 --keep_intermediate ../output/sorted_integers.txt
Determining the number of integers and chunks in the input file...
Number of Integers: 1000000
Number of Chunks: 100
Creating Chunk Files
|===================================================================| 100% (100)

The progress bar starts at 0% and updates as the chunk files are created. In this case, I created chunk files for 1 million integers in chunks of 100, which resulted in 10,000 chunk files. That was enough chunks for me to get a good sense of how the progress update performs when it takes a noticeable amount of time (around 12 seconds) for the progress bar to go from 0% to 100%.

Conclusion

We now have a program that will chunk and sort large numbers of integers. Next we have to merge those sorted integers to finally end up with a single sorted file of integers. In my previous experience in implementing this in Node.js and C#, I found that this was the most challenging and time-consuming part of the solution. I have no doubt it will be so in Elixir, although I'm hoping that Elixir will make this a little easier than in other languages.