So now that IntGen is finished, we have an application that will generate a file of random integers. That was the easy part. The harder part comes in IntSort, which is the application that will chunk those integers into sorted chunks and then merge them into a single sorted file without having all the integers in memory at once.

This chunking functionality solves the problem of not loading all the integers into memory at once. We just load a subset of the integers into memory at any particular time (not exceeding the amount of memory available of course). I refer to this subset of integers as a "chunk". We sort the integers in the chunk using a standard sort and then write them to a chunk file. We then load the next chunk of integers into memory and continue creating sorted chunks of integers until all the integers have been processed. Those chunk files can then be merged later (also without loading many integers into memory) to create a single sorted integer file.

The first thing I decided to tackle was the the process of creating chunk files, which needs to happen first. So in the int_sort project, I created a Chunk module in "lib/chunk.ex" to deal with chunking functionality. I'm pursuing a stream-oriented implementation, since all the steps involved in creating chunk files can be assembled into a stream pipeline.

Creating Chunks

The first function I wrote, Chunk.create_chunks/2, accepts an input stream and a chunk size and creates a stream that emits chunks.

@spec create_chunks(Enum.t(), pos_integer()) :: Enum.t()
def create_chunks(input_stream, chunk_size) do
  Stream.chunk_every(input_stream, chunk_size)
end

That was pretty easy because the chunking functionality is already included in the Stream module.

I decided to use streams to make this as simple, testable, and composable as possible, leaving the details of transforming files into streams for some other function to handle.

The unit tests for the Chunk module are located in "test/chunk_test.exs". I'm not going into depth on the unit tests, but you're welcome to read them. Since the function returns a stream, the easiest way to test is to just pull the data from the stream and verify it. I do that whenever I can, since that's a simple way of verifying the results.

Sorting Chunks

The next function takes the chunk stream emitted from the previous function and emits sorted chunks. I'm trying to keep each function simple not only so they are easy to understand, but also so that they can be composed in a flexible manner.

@spec sort_chunks(Enum.t()) :: Enum.t()
def sort_chunks(chunk_stream) do
  Stream.map(chunk_stream, &Enum.sort/1)
end

This function was pretty easy. All I had to do was compose Stream.map/2 with Enum.sort/1 to sort the contents of each chunk.

The unit testing strategy is the same as the previous function: read what the stream emits and verify that it is correct.

Writing Chunks to a Stream

Now that we have functions that can create a stream of sorted chunks, the next step is to write those chunks to chunk files. In keeping with my goal to keep functions simple and composable, I created a Chunk.write_chunks_to_separate_streams/2 function that writes each chunk to a separate stream. That stream will likely be a chunk file stream, but I can plug any other kind of stream in there, which not only makes this function flexible, but makes it easier to test.

The code for this function is significantly more involved than the previous functions because I didn't have a nice set of predefined functions I could compose. Still, I could create a stream pipeline that is fairly understandable.

@spec write_chunks_to_separate_streams(
        Enum.t(),
        non_neg_integer(),
        (non_neg_integer(), non_neg_integer() -> Enum.t())
      ) :: Enum.t()
def write_chunks_to_separate_streams(chunk_stream, gen, create_chunk_stream) do
  chunk_stream
  # Include the chunk number
  |> Stream.with_index(1)
  # Transform tuples into tuples of chunks and chunk output streams
  |> Stream.map(fn {chunk, chunk_num} ->
    chunk_stream = create_chunk_stream.(gen, chunk_num)

    {chunk, chunk_stream}
  end)
  # Write each chunk to its output stream
  |> Stream.each(fn {chunk, chunk_stream} ->
    @integer_file.write_integers_to_stream(chunk, chunk_stream) |> Stream.run()
  end)
end

This function takes a stream that emits chunks, a generation number (chunking is the first generation of file creation, and each subsequent merge is a generation of its own), and a function that creates a stream for a chunk. This last callback function makes this function a little different than the other functions we've seen so far. Since I wanted to make this function ignorant of file details, I needed a way to create a file stream without getting involved in the details of file streams. So I let each chunk stream be created using a function. The details of that stream creation function are unknown to this function: all it cares about is that the callback function returns a stream that it can write a chunk to.

So the stream pipeline starts with creating a stream of tuples with the chunk number and the chunk contents. Then it passes the generation number and the chunk number (which distinguishes this chunk from all others) to create_chunk_stream/2, it does its magic, and returns a stream to me. Then I use IntegerFile.write_integers_to_stream/2 to write the integers to the stream. I have to pipe this operation to Stream.run/0 to make sure it actually gets run, otherwise I will have set up a stream that didn't do anything.

So at the end, we should have written the chunk to its own stream in integer file format. If the calling code did its job right, the chunk was written to a chunk file, or perhaps a test stream if testing.

Unit testing this function was a bit interesting. First, I created a bunch of test StringIO streams, one for each expected chunk. Then I put the test streams in a map where the key is a tuple containing the gen and chunk number and the value is the test stream for that chunk.

@spec create_test_chunk_streams(pos_integer(), non_neg_integer()) :: chunk_stream_map()
defp create_test_chunk_streams(_, 0), do: %{}

defp create_test_chunk_streams(gen, num_chunks) do
  test_streams =
    Enum.map(1..num_chunks, fn chunk_num ->
      {{gen, chunk_num}, TestStream.create_test_stream()}
    end)

  # We should end up with a stream map whose key is the {gen, chunk_num} tuple
  # and whose value is a tuple containing the stream and the stream device
  Enum.into(test_streams, %{})
end

Then a function gets passed to Chunk.write_chunks_to_separate_streams/2 that pulls the corresponding stream out of the map and provides it for the chunk to be written to.

# Create a test chunk stream for each chunk
chunk_streams = create_test_chunk_streams(chunk_gen, length(test_chunks))

# Create a create_chunk_stream callback function to pass to write_chunks_to_separate_streams
create_chunk_stream = fn gen, chunk_num ->
  get_test_chunk_stream(gen, chunk_num, chunk_streams)
end

# Call write_chunks_to_separate_streams
final_stream =
  Chunk.write_chunks_to_separate_streams(test_chunks, chunk_gen, create_chunk_stream)

# Verify the results
verify_chunk_results(final_stream, test_chunks, chunk_gen, chunk_streams)

When verifying the results, I first verify that the final stream is emitting the expected chunks, and then I pull the data from the test streams (which I can do since they are StringIO streams) and verify that the data written to each test stream matches the integers in the corresponding chunk.

defp verify_chunk_results(final_stream, test_chunks, gen, chunk_streams) do
  # Add a verification step to the stream to verify that the final stream
  # output is what we are expecting. Then start processing the stream
  final_stream
  |> Stream.with_index(1)
  |> Stream.each(fn chunk_item ->
    verify_stream_output(chunk_item, gen, chunk_streams)
  end)
  |> Stream.run()

  # Retrieve the actual chunks that were written to the test streams
  actual_chunks = chunks_from_test_streams(gen, chunk_streams)

  # The chunks are in order, so zip them together and verify that the expected
  # chunks were written to the output streams
  test_chunks
  |> Enum.zip(actual_chunks)
  |> Enum.each(fn {expected, actual} -> assert expected == actual end)

  :ok
end

While I was researching how best how to verify what data a function being tested passed to a callback function (StringIO is great in this case, but not all callback functions use streams or IO devices), I discovered another interesting technique.

I recently discovered that we can verify data passed to a test callback function (or indeed storing any other data we want to verify later) by passing that data as messages to an Elixir process (often the current process) and then verifying those messages later on in the unit test. I don't know anything yet about using processes or sending them messages yet, but it's good to be aware of this strategy, especially since immutable data makes my standard testing practice of adding data to be verified to a mutable data structure impossible.

I plan to keep this in mind in the future as a unit testing strategy when I know more about passing messages to Elixir processes.

Creating the Chunk File Stream

When calling Chunk.write_chunks_to_separate_streams/2 in the context of the application, I'm going to need to create a function that creates an actual file stream. So I created an IntermediateFile module with the intermediate_file_stream/4 function in it. I had originally called this function chunk_file_stream/4 and put it in the Chunk module until I realized this this function would be useful for all intermediate files, not just chunk files. When doing the generations of chunk file merges, I'll be using the same pattern to create intermediate file streams.

def intermediate_file_stream(gen, num, create_file_name, output_path) do
  # Create a file name for the intermediate file
  file_name = create_file_name.(gen, num)

  # Combine the file name with the output directory to create a file path
  intermediate_file_path = Path.join(output_path, file_name)

  # Create a stream for that file path and return it
  @integer_file.integer_file_stream(intermediate_file_path)
end

This function does not have a particular file name hardcoded into it. Instead, it calls a create_file_name/2 callback function to generate the file name. The purpose of this function is to handle the file stream part of it, not the file naming. That can be handled elsewhere.

Conclusion

I now have the building blocks in place to create chunk files, but I still need to compose them together to get the work done. I also need to create a command line interface to be able to call this functionality. I'll go into that in the next section.

Before I go, I want to say that it wasn't quite as easy and straightforward as I'm presenting it here. I'm just showing the final product. I had some false starts with less composable functions and I had to make an effort to keep functions as simple and composable as I could. I had to make several iterations before I thought I had a good level of simplicity and composability. So don't be discouraged if you have to make multiple iterations on your code. I'm still adjusting to functional programming and I'm still exploring how to best structure my Elixir code.