Learn With Me: Elixir - ElixirLargeSort IntSort Project Part 3 (#81)

It's been a while since the last post. I had actually finished this project long ago and wrote the rought drafts of these posts as I was working on the project, but I haven't had time recently go over those and publish them. I've had some more free time recently, so let's finish writing and finally publish this stuff!

In the last section, I finished implemented chunking and the associated chunking progress bar, leaving us with a nice group of chunk files. Today, I'll discuss the creation of a merging mechanism for merging groups of those chunk files.

An Overview of Merging

Now that we have a bunch of chunks of sorted integers in their own files, I want to go over how to merge those files into a single sorted integer file. Since each chunk of integers we're merging is sorted, it's fairly easy to merge them. We know that the smallest integers are always at the beginning of the file and get bigger as we go through the file, so we'll keep track of the minimum integer in each file, taking the smallest integer and putting it into the merged file.

I'll illustrate this with an example.

A B C
2 -3 0
5 -1 3
8 1 9
20 4 16
12 17
15

We have three chunk files: A, B, and C. Each file contains sorted integers. Here's how the process of merging them works.

First we read the first integers from all three files. Since the files are sorted, we know that each integer is the smallest possible integer in that file:

A B C
2 -3 0

Then we determine which one the minimum integer is. In this case, it's -3, so we'll take -3 and put write it to the output file. Then we read the next integer from the file that had the minimum integer and that's the new minimum integer for that file.

A B C
2 -1 0

What happens if the same minimum integer value appears more than once? Just pick one of them. It doesn't matter. Equal integers can go in any order since they have the same value.

Now that we have a new set of minimum integers, we repeat the same process.

  • Find the minimum integer
  • Write it to the output file
  • Read the next integer from the minimum integer file

So in the next step, -1 in file B would be the minimum integer and we'd read the next integer in that file.

A B C
2 1 0

Min: 0

Here is the collection of minimum integers in the remaining stages in the merge process. We'll show the minimum integer in each of the three files at each step in the merge process. A nil value indicates that the file has no more integers and that this value can be ignored when determining the minimum integer.

A B C
2 1 3

Min: 1

A B C
2 4 3

Min: 2

A B C
5 4 3

Min: 3

A B C
5 4 9

Min: 4

A B C
5 12 9

Min: 5

A B C
8 12 9

Min: 8

A B C
20 12 9

Min: 9

A B C
20 12 16

Min: 12

A B C
20 15 16

Min: 15

A B C
20 nil 16

Min: 16

A B C
20 nil 17

Min: 17

A B C
20 nil nil

Min: 20

A B C
nil nil nil

Finally, there are no more integers to read, and the merging is done. The output file looks like this:

-3
-1
0
1
2
3
4
5
8
9
12
15
16
17
20

We have a nicely sorted file of integers and only N integers are in memory at any one time, where N is the number of chunks being merged. Chunks won't always be the same length: one of the chunks may be smaller than the others if the number of integers is not evenly divisible by the chunk size. However, that's not a problem in the way we are doing the merge.

The problem with merging files is that opening files uses finite system resources, so merging thousands of files at once may cause problems depending on the capabilities of the system. I solve that possible problem by limiting the number of chunk files that are merged at once. To keep things simple, I'll typically merge files in groups of 10, since I imagine that almost all systems should be able to handle 10 files open at once. I call these group of files being merged "merge groups" or "chunk groups". Multiple rounds of merging (called "merge rounds") are typically needed to merge all the chunk files into a single sorted file.

For example, let's say that we have 121 chunk files and each merge group consists of 10 files being merged together. After the first round of merging, we are left with 13 intermediate files (120 of them are merged into 12 files and the remaining file just remains as it is, since there are no others to merge it with). After the second round of merging, the 13 intermediate files have been merged into 2 intermediate files (10 into the first file and 3 into the second file). After the third round of merging, the 2 intermediate files have been merged into a single file. That single file is the final product. The merge process has O(n log(n)) performance. We have go over all the integers in each round of merges (n), but the number of merges is logarithmic (log(n)).

The chunk files as well as the files produced are called "intermediate files". Intermediate files are temporary in nature, and can be deleted when they are no longer needed, since only the final file matters. I typically include a flag that indicates whether intermediate files are to be kept or deleted. Keeping the intermediate files is very helpful to not only illustrate what is happening in each round of merges, but helps me verify that the program is producing the correct output.

Those of you who are familiar with sorting algorithms may recognize this chunking and merging process as being almost the same as the mergesort algorithm. We start off with merging larger pieces, but the merge process is pretty much the same. I personally didn't realize the similarity until later on when I suddenly came to the realization that I had essentially reinvented mergesort with files.

The Merge Stream

I figured that the process of merging would be quite simple in Elixir: I'd just merge streams of integers, pick the minimum integer each time, and then go to the next set of minimum integers by combining a few Stream functions. However, Elixir doesn't have any functions that work that way. We can zip streams together using Stream.zip/1, which gives us sets of integers in the form of tuples, but once the integers have been emitted from a stream, there is no going back. You also can't pick and choose which stream emits the next integers: they all emit the next integer. This won't work for us because we want to take one of the integers and have the stream emit the remaining integers as the next stream element.

None of the other Stream functions did what I wanted either, since Stream functions allow you to react to items being emitted from a stream and transform them, not control when streams emit the next element. You are reacting to streams (hence the term "reactive programming") rather than exerting fine-grained control over them. So a quick and easy solution was out of the question.

I still wanted to create a stream that emitted the next minimum integer in the merge group, and in researching this problem I came across a solution to a similar problem that involved opening file devices and then creating a stream using Stream.resource/3. That's essentially what I ended up doing to create my merge stream. In Stream.resource/3, I can control the next value emitted, and using file devices, I can control which file is read from next, allowing me to create a merge stream that emits minimum integers from the group of files.

Here's the code for the IntermediateFile.merge_stream/2 function in lib/intermediate_file.ex.

@doc """
Creates a merge stream that emits the next smallest integer from all
the intermediate files that are being merged

This is where the merge magic happens. Integers are read from the (sorted)
intermediate files and the smallest integer among the files gets emitted
from the stream. The next integer is read from the file where the last
emitted integer came from and the process happens all over again until
there are no more integers to emit.

The devices passed into this function must have been previously opened
using `IntegerFile.read_device!/1`. I did it this way so rather
than using file names so that someone could make a merge stream using
any kind of device, not just files. It certainly makes this function
more easily testable.

## Parameters

- devices: Open integer file devices, with each device representing an
intermediate file to be merged. Use `IntegerFile.read_device!/1` to
open the integer file devices. If the devices were not opened using
`IntegerFile.read_device!/1`, you'll get an error when attempting
to read from the stream. Note that even though I call these "integer
file devices", they don't strictly have to be devices for files. Any
device containing integer-file-formatted data can be used.
- close_device: A function that will be called for each file integer
device when the stream has terminated. This is an optional parameter,
and if you don't specify it, you'll need to close the devices yourself.

## Returns

A stream that emits the next smallest integer from among the intermediate
files. The output represents the merged contents of the files and can be
streamed into another integer file.
"""
@spec merge_stream(Enum.t(), (IO.device() -> term())) :: Enum.t()
def merge_stream(devices, close_device \\ fn _ -> :ok end) do
  Stream.resource(
    fn -> initial_merge_stream_state(devices) end,
    &next_merge_integer/1,
    fn stream_state -> cleanup_merge_stream(stream_state, close_device) end
  )
end

As you can see, the function documentation is far larger than actual function.

This function accepts a collection of Elixir I/O devices whose contents are to be merged. I used devices instead of file names so that the function is not strongly coupled to files. This not only makes the function simpler and more flexible, but also much more easily testable. I can pass in file devices or I can pass in StringIO devices with predefined contents during unit testing. File names can be transformed to devices by a separate function; this function just focuses merging the contents.

The second function parameter is an optional function that is called to close the devices when the stream has completed. You can have the devices closed automatically by this parameter function or you can do it yourself later on. We have to use a parameter function to do the closing because the exact function to close a device differs depending on what kind of device it is. This function doesn't know what kind of devices they are (or if they are even all the same kind of device), so it just calls the parameter function to close the devices.

IntegerFile.merge_stream/2 is actually pretty compact: it just makes a call to Stream.resource/3 to create the merge stream. The first function creates the initial state of the stream (this is like an accumulator in a reduce function), the second function is called when the next stream item is to be emitted, and the third function is called when the stream has finished and does any cleanup work.

The initial_merge_stream_state/1 function, which creates the initial stream state, looks like this:

# Creates the initial state of the merge stream, which will be used as the
# accumulator when retrieving future stream elements
@spec initial_merge_stream_state(Enum.t()) :: merge_stream_acc()
defp initial_merge_stream_state(file_devices) do
  # Create a collection of tuples where the first element is the
  # first integer read from the device and the second element is the
  # device
  Enum.map(file_devices, fn device -> {read_next_integer(device), device} end)
  |> Enum.to_list()
end

It transforms the initial set of devices into this initial set tuples consisting of minimum integers and their corresponding devices. This is the equivalent to the set of minimum integers for all files in the merge examples above.

For this stream, we need to keep both the minimum integers and their corresponding devices together so that we know which device to read from when a smallest integer is found. The next integer in the device is read by read_next_integer/1 does the work of retrieving the next integer value from a device. So the initial stream state is just a collection of minimum integer tuples.

The next_merge_integer/1 function that determines what the stream will emit. It accepts the current stream state, which was returned by initial_merge_stream_state/1 or a previous call to next_merge_integer/1, just like how a reduce function deals with the accumulator.

# Retrieves the next merge integer from the integer file devices
@spec next_merge_integer(merge_stream_acc()) ::
        {[integer()], merge_stream_acc()} | {:halt, merge_stream_acc()}
defp next_merge_integer(stream_state) do
  # Determine the minimum integer in the available integers. This will be the
  # value that the stream emits. This code works with nil values because
  # a nil is always larger than any integers. We'll only get a nil if
  # no integers remain. Remember that this returns a tuple with the min
  # value and the device it was read from
  min_value = Enum.min_by(stream_state, fn {value, _} -> value end, fn -> nil end)

  # Now that we have the min value, calculate the value to be returned
  merge_stream_value(min_value, stream_state)
end

The function firsts finds the smallest integer among the tuples in the stream state. The Enum.min_by/3 function will allow me to specify which part of the tuple to compare, but will return the entire tuple as the minimum value. The tuple and the stream state will be passed to merge_stream_value/2 to determine exactly what value to return from this function.

Here's what merge_stream_value/2 looks like:

# Creates the value to be returned from the merge stream's next value function.
# If the min value is nil, that means there are no more integers and the stream
# can be terminated. Otherwise, we'll emit the min value and create a new
# stream state with the latest set of integers from each device
@spec merge_stream_value({integer() | nil, IO.device()}, merge_stream_acc()) ::
        {[integer()], merge_stream_acc()} | {:halt, merge_stream_acc()}
defp merge_stream_value(nil, stream_state) do
  # The min value is nil, so there are no more integers. Terminate the stream.
  {:halt, stream_state}
end

defp merge_stream_value({nil, _}, stream_state) do
  # The min value is nil, so there are no more integers. Terminate the stream.
  {:halt, stream_state}
end

defp merge_stream_value({min_integer, device}, stream_state) do
  # Reading the next number from the device that produced the min value
  # so that we'll get the next integer from the device
  new_value = {read_next_integer(device), device}

  # Update the stream state with the new value tuple
  stream_state = List.keyreplace(stream_state, device, 1, new_value)

  # Return the min integer with the new stream state
  {[min_integer], stream_state}
end

If the minimum value is nil (in the case of devices that are initially empty) or a tuple that contains nil as the first element (in the case that there are no more integers to read), that means that there are no more integers to emit, and that the stream can be terminated. We terminate the stream by returning a tuple whose first element is :halt and whose second element is the final stream state. That final stream state is passed to the cleanup function.

If the minimum value is not null, that means the stream still has a value to emit. In that case, we return a tuple containing a list of values to be emitted (we only have one) and the new stream state. The new stream state is calculated by reading the next integer from the device with the current minimum integer and using it to update the stream state using List.keyreplace, which updates the corresponding device tuple. This performs the process I illustrated above in the "An Overview of Merging" section above. The stream state returned here will be passed to the next next_merge_integer/1 call, just like an accumulator in a reduce operation.

Once the stream has terminated, the cleanup_merge_stream/2 function is called.

# Cleans up after the merge stream terminates, closing all devices
@spec cleanup_merge_stream(merge_stream_acc(), (IO.device() -> term())) :: :ok
defp cleanup_merge_stream(stream_state, close_device) do
  stream_state
  |> Enum.map(fn {_, device} -> device end)
  |> Enum.each(close_device)

  :ok
end

The cleanup_merge_stream/2 function closes every device in the stream state using the close device function. This is either the default function that does nothing or the one that was passed in by the caller when creating the merge stream.

That's the entire process of merging a file group.

Testing the Merge Stream

I wanted to make sure the merge stream worked correctly, so I wrote a bunch of unit tests to verify that. As usual, I wrote a generic test function with each unit test calling the test function with different parameters. I tried a range of different scenarios, merging different combinations of devices with varying amount of integers with a varying range of those integers.

One bug I found while running the tests was that my code didn't handle a list of zero devices correctly. Attempting to read data from those devices just gave me a nil value initially, so I had to add a clause to merge_stream_value/2 to handle nil values as well as tuples that contained nil.

I'm not going to go into the test code in detail, but you can find it in the ElixirLargeSort Github project in test/intermediate_file_test.exs.

What's Next?

Now I have a working merge stream, but I haven't yet integrated it into the rest of the IntSort program. I still have to create the file groups to be merged and repeat the rounds of merges until there is only one file left. I'll go and implement that, and in the next section, I'll go over how I did that.