Parallel Downloads in Elixir

I was working on a project where I needed to download Gravatars of all contributors to a given git repository. But, there can be a lot of them, so doing this in parallel may be a good idea. Oh, I know a language that is good at concurrency! Let’s use Elixir for this.

Getting the data

A little comment regarding this section - I’m aware that many things could be done easier, and with fewer functions. But the goal of this article is to learn something new, and not just get the job done.

Let’s define a Committer struct to hold the data:

defmodule Committer do
  defstruct [:name, :email]
end

Now we need to populate it. We need to get a list of all project’s committers, preferably with their names and emails, so that we’ll be able to fetch Gravatars. Fortunately git itself comes with a plethora of tools, and we can use one of them to do out job - git log to the rescue! As all the git commands log takes myriads of different arguments, options, and switches, but there’s extensive documentation. We’ll use:

git log --pretty="format:%an|%ae" 

To get the data let’s define a function:

defp from_repo(repo) do
  args = ["log", ~S{--pretty="format:%an|%ae"}]

  case System.cmd("git", args, cd: repo) do
    {committers, 0} ->
      committers
    {_, code} ->
      raise RuntimeError, "`git log` failed with code #{code}"
  end
end

We use System.cmd/3 and handle failures gracefully. Did you know you can use the cd option to change the working dir for the command you’re running? I’ve learned it just now, but it seems to be really handy in come cases! I wonder - what other surprises and hidden gems are there in the Elixir’s standard library.

Lets define our main function for returning the committers:

def list(repo) do
  repo
  |> from_repo
  |> Stream.unfold(fn str ->
    case String.split(str, "\n", parts: 2, trim: true) do
      []      -> nil
      [value] -> {value, ""}
      list    -> List.to_tuple(list)
    end
  end)
  |> Stream.map(&String.split(&1, "|", parts: 2))
  |> Stream.map(&Enum.zip([:name, :email], &1))
  |> Stream.map(&struct(Committer, &1))
  |> Stream.uniq(&(&1.name)))
end

There’s a lot happening here so let’s go step-by-step. We first retrieve the raw data with our from_repo/1 function. But it comes as a big, long string. We need to turn it into a nice collection that we can process. We’ll use stream for that, partially because the committers list can be really large, and we may want to process it lazily, but mainly because I want to show the Stream.unfold/2 function.

Stream.unfold/2

What does Stream.unfold/2 do? I think everybody is familiar with the reduce function - it allows us to turn a collection into a single value. In many languages (including Erlang itself) reduce is called fold. It may now become clear what unfold does, as it’s the opposite of fold (and reduce) - it turns a single value into a collection.

How it works? Let’s take a look at the documentation. We need to pass the initial value for the accumulator and a function, that given the accumulator will return the next value, and the next accumulator. The function should return nil if the stream is exhausted.

How do we do it? We split the string on the first newline (parts: 2) rejecting any empty strings (trim: true). We later match on the result. * Empty list means we got to the end of our committers - we return nil. * Single value value means it’s the before last element - we return it, and pass empty string as our accumulator. * Any other result will have two elements. We turn it into a tuple - the first element (that was before newline) will be our value, and the rest (after the newline) will be our new accumulator.

Further processing

We now have a stream of values in the format "name|email" we need to change it into a stream of our Committer structs.

We first split on the pipe, and that gives us ["name", "email"].

Next we zip this list with the struct field’s labels - that will give us a list of tuples - [{:name, "name"}, {:email, "email"}]. It turns out this is exactly what we need to express our struct!

We’ll now use the struct/2 function. It takes name of the struct and a list of fields and turns it into a proper struct - %Committer{name: name, email: email}. Success!

Not quite! The git log command we used will return an entry for each commit - we only care about unique committers - that’s why we’ll use Stream.uniq/2. But we want to use only the name of the committer as our value to check, if we’ve already seen this user. Why? Because some people use multiple email addresses to commit code.

Uff. That’s it! We have our stream of committers. Let’s download some files!

Downloading files

There are multiple ways to make HTTP requests in both Elixir and Erlang, but I decided to go with the Erlang’s httpc module mainly because it’s already there and we don’t need any other dependencies - that makes it possible to have this as a simple script without creating a full-blown mix project.

Let’s define a function to fetch the data from the Gravatar API.

def fetch_gravatar(%Committer{email: email}, format \\ :png) do
  request   = {gravatar_url(email, format), []}
  http_opts = [timeout: 5000]
  opts      = [body_format: :binary, full_result: false]

  case :httpc.request(:get, request, http_opts, opts) do
    {:ok, {200, body}} ->
      {:ok, body}
    {:ok, {num, _}} ->
      {:error, "response code #{num}"}
    {:error, _} = error ->
      error
  end
end

We prepare request parameters, setting proper request timeout (timeout: 5000), using binaries (body_format: :binary), and setting on a limited response format (full_result: false). Let’s look at the function that prepares the URL.

@base_url "http://www.gravatar.com/avatar/"
@url_params "?d=identicon&s=200"
defp gravatar_url(email, format) do
  '#{@base_url}#{email_hash(email)}.#{format}#{@url_params}'
end

defp email_hash(email) do
  email
  |> String.strip
  |> String.downcase
  |> hash
  |> Base.encode16(case: :lower)
end

defp hash(data), do: :crypto.hash(:md5, data)

We do everything according to the API specification: we take the email, strip it, downcase it, hash it with md5, and hex-encode it. We also use some request params - you can read more about them in the Gravatar documentation.

Gluing everything together

So we have all the parts. Now we have to assemble them, and define a module that will perform the actual work of downloading all the images, save them, and parse all the command line arguments. Let’s define a new module for this - Downloader. The main function we’ll be calling from outside will be run:

def run(args) do
  Application.ensure_all_started(:inets)

  {repo, out} = parse_args(args)
  File.mkdir_p!(out)

  File.cd!(out, fn ->
    repo
    |> Committer.list
    |> Stream.chunk(50, 50, [])
    |> Stream.each(&fetch_and_save_batch/1)
    |> Stream.run
  end)
end

We need to start the :inets application, that the module we use for HTTP requests (:httpc) is part of. We later parse the arguments, prepare the output directory, and move to it, for the rest of the commands (using File.cd!/2). We also split our stream of committers into 50 element batches. Why are we doing this? We’ll be performing our requests in parallel, but the problem is, that simply starting all the requests at once, will probably cripple our OS, or the VM, because either one is not prepared to handle hundreds (or even thousands) requests at the same time (trust me, I’ve tried it). We could use some pooling mechanism to limit the number of concurrent requests, but batching is a much simpler solution, that is good enough in that case.

defp fetch_and_save_batch(committers) do
  committers
  |> Enum.map(&Task.async(fn -> fetch_and_save(&1) end))
  |> Enum.map(&Task.await(&1, 10000))
end

Here we take care of each batch. We start a task for each of the downloads, we want to perform, and then await on it. You may ask, why not use Stream here too, as we did in all the other places? Why this sudden switch to Enum? As you know, Stream is lazy, and starts processing the next element only after the current one passed through the whole pipeline. What does it mean here? It would make tasks go one, by one through our Task.async and Task.await turning them sequential! The exact opposite of what we’re trying to achieve. But we can use Enum to be eager and resolve this problem - we first start all of the tasks, and then await on all of them. Yes, that means, we’ll spend on all the tasks as much time as the slowest task will take - but that’s why we use timeouts, so it’s not too long.

defp fetch_and_save(%Committer{name: name} = committer) do
  require Logger

  case Committer.fetch_gravatar(committer, :png) do
    {:ok, image} ->
      File.write!("#{name}.png", image)
      Logger.info "downloaded gravatar for #{name}"
    {:error, reason} ->
      Logger.error "failed to download gravatar for #{name}, because: #{inspect reason}"
  end
end

Here we do all the dirty work - the actual fetching of the file, and saving it. We also log all results, to give our users some feedback. Do you remember we’ve used File.cd!/2 to go into our output directory? That’s why we can simply use a filename when writing the images, and skip the directory part.

defp parse_args(args) do
  case OptionParser.parse(args) do
    {_, [repo, out], _} ->
      {repo, out}
    _ ->
      IO.puts "Usage: download repository output_dir\n"
      raise "Wrong arguments given to `download`"
  end
end

Finally we have this simple function to parse the command line arguments. It’s dead simple, but there’s nothing more we should really do.

And the last part, we set everything in motion, by calling:

Download.run(System.argv)

Conclusion

We’ve shown that working with the outside commands in Elixir is really simple, and that all it takes to parallelize operations is use the Task module from the standard library. By using Stream and Enum we can easily work with collections, either lazily or eagerly - as we’ve shown none is better - each one has it’s uses. We can also use the built-in Erlang modules to offload some work, because of Elixir’s great compatibility.

All of the code for this post can be found here

Where is my comment box!?

I don't do traditional comments, but you're welcome to send me an email to michal at muskala dot eu, and I'll publish it at the bottom of the article as a comment