I was working on a project where I needed to download Gravatars of all contributors to a given git repository. But, there can be a lot of them, so doing this in parallel may be a good idea. Oh, I know a language that is good at concurrency! Let’s use Elixir for this.
Getting the data
A little comment regarding this section - I’m aware that many things could be done easier, and with fewer functions. But the goal of this article is to learn something new, and not just get the job done.
Let’s define a Committer
struct to hold the data:
defmodule Committer do
defstruct [:name, :email]
end
Now we need to populate it.
We need to get a list of all project’s committers, preferably with their
names and emails, so that we’ll be able to fetch Gravatars. Fortunately git
itself comes with a plethora of tools, and we can use one of them to do out job -
git log
to the rescue! As all the git commands log
takes myriads of
different arguments, options, and switches, but there’s
extensive documentation. We’ll use:
git log --pretty="format:%an|%ae"
To get the data let’s define a function:
defp from_repo(repo) do
args = ["log", ~S{--pretty="format:%an|%ae"}]
case System.cmd("git", args, cd: repo) do
{committers, 0} ->
committers
{_, code} ->
raise RuntimeError, "`git log` failed with code #{code}"
end
end
We use System.cmd/3
and handle failures gracefully. Did you know you can use
the cd
option to change the working dir for the command you’re running? I’ve
learned it just now, but it seems to be really handy in come cases!
I wonder - what other surprises and hidden gems are there in the Elixir’s
standard library.
Lets define our main function for returning the committers:
def list(repo) do
repo
|> from_repo
|> Stream.unfold(fn str ->
case String.split(str, "\n", parts: 2, trim: true) do
[] -> nil
[value] -> {value, ""}
list -> List.to_tuple(list)
end
end)
|> Stream.map(&String.split(&1, "|", parts: 2))
|> Stream.map(&Enum.zip([:name, :email], &1))
|> Stream.map(&struct(Committer, &1))
|> Stream.uniq(&(&1.name)))
end
There’s a lot happening here so let’s go step-by-step.
We first retrieve the raw data with our from_repo/1
function.
But it comes as a big, long string. We need to turn it into a nice collection
that we can process. We’ll use stream for that, partially because the committers
list can be really large, and we may want to process it lazily, but mainly
because I want to show the Stream.unfold/2
function.
Stream.unfold/2
What does Stream.unfold/2
do? I think everybody is familiar with the reduce
function - it allows us to turn a collection into a single value. In many
languages (including Erlang itself) reduce is called fold
. It may now
become clear what unfold does, as it’s the opposite of fold (and reduce) -
it turns a single value into a collection.
How it works? Let’s take a look at the
documentation.
We need to pass the initial value for the accumulator and a function, that given
the accumulator will return the next value, and the next accumulator. The
function should return nil
if the stream is exhausted.
How do we do it? We split the string on the first newline (parts: 2
)
rejecting any empty strings (trim: true
). We later match on the
result.
- Empty list means we got to the end of our committers - we return
nil
. - Single value value means it’s the before last element - we return it, and pass empty string as our accumulator.
- Any other result will have two elements. We turn it into a tuple - the first element (that was before newline) will be our value, and the rest (after the newline) will be our new accumulator.
Further processing
We now have a stream of values in the format "name|email"
we need to change it
into a stream of our Committer
structs.
We first split on the pipe, and that gives us ["name", "email"]
.
Next we zip this list with the struct field’s labels - that will give us a list
of tuples - [{:name, "name"}, {:email, "email"}]
. It turns out this is exactly
what we need to express our struct!
We’ll now use the struct/2
function. It takes name of the struct and a list of
fields and turns it into a proper struct - %Committer{name: name, email: email}
.
Success!
Not quite! The git log
command we used will return an entry for each
commit - we only care about unique committers - that’s why we’ll use Stream.uniq/2
.
But we want to use only the name of the committer as our value to check, if
we’ve already seen this user. Why? Because some people use multiple email
addresses to commit code.
Uff. That’s it! We have our stream of committers. Let’s download some files!
Downloading files
There are multiple ways to make HTTP requests in both Elixir and Erlang, but I
decided to go with the Erlang’s
httpc
module mainly because it’s
already there and we don’t need any other dependencies - that makes it possible
to have this as a simple script without creating a full-blown mix
project.
Let’s define a function to fetch the data from the Gravatar API.
def fetch_gravatar(%Committer{email: email}, format \\ :png) do
request = {gravatar_url(email, format), []}
http_opts = [timeout: 5000]
opts = [body_format: :binary, full_result: false]
case :httpc.request(:get, request, http_opts, opts) do
{:ok, {200, body}} ->
{:ok, body}
{:ok, {num, _}} ->
{:error, "response code #{num}"}
{:error, _} = error ->
error
end
end
We prepare request parameters, setting proper request timeout (timeout: 5000
),
using binaries (body_format: :binary
), and setting on a limited response
format (full_result: false
). Let’s look at the function that prepares the URL.
@base_url "http://www.gravatar.com/avatar/"
@url_params "?d=identicon&s=200"
defp gravatar_url(email, format) do
'#{@base_url}#{email_hash(email)}.#{format}#{@url_params}'
end
defp email_hash(email) do
email
|> String.strip
|> String.downcase
|> hash
|> Base.encode16(case: :lower)
end
defp hash(data), do: :crypto.hash(:md5, data)
We do everything according to the API specification: we take the email, strip it, downcase it, hash it with md5, and hex-encode it. We also use some request params - you can read more about them in the Gravatar documentation.
Gluing everything together
So we have all the parts. Now we have to assemble them, and define a module that
will perform the actual work of downloading all the images, save them, and parse
all the command line arguments. Let’s define a new module for this - Downloader
.
The main function we’ll be calling from outside will be run
:
def run(args) do
Application.ensure_all_started(:inets)
{repo, out} = parse_args(args)
File.mkdir_p!(out)
File.cd!(out, fn ->
repo
|> Committer.list
|> Stream.chunk(50, 50, [])
|> Stream.each(&fetch_and_save_batch/1)
|> Stream.run
end)
end
We need to start the :inets
application, that the module we use for HTTP
requests (:httpc
) is part of. We later parse the arguments, prepare the output
directory, and move to it, for the rest of the commands (using File.cd!/2
).
We also split our stream of committers into 50 element batches. Why are we doing
this? We’ll be performing our requests in parallel, but the problem is, that
simply starting all the
requests at once, will probably cripple our OS, or the VM, because either one is
not prepared to handle hundreds (or even
thousands) requests at the same time (trust me, I’ve tried it). We could use
some pooling mechanism to limit the number of concurrent requests, but batching
is a much simpler solution, that is good enough in that case.
defp fetch_and_save_batch(committers) do
committers
|> Enum.map(&Task.async(fn -> fetch_and_save(&1) end))
|> Enum.map(&Task.await(&1, 10000))
end
Here we take care of each batch. We start a task for each of the downloads, we
want to perform, and then await on it. You may ask, why not use Stream
here
too, as we did in all the other places? Why this sudden switch to Enum
? As you
know, Stream
is lazy, and starts processing the next element only after the
current one passed through the whole pipeline. What does it mean
here? It would make tasks go one, by one through our Task.async
and
Task.await
turning them sequential! The exact opposite of what we’re trying to
achieve. But we can use Enum
to be eager and resolve this problem -
we first start all of the tasks, and then await on all of them. Yes, that means,
we’ll spend on all the tasks as much time as the slowest task will take - but that’s
why we use timeouts, so it’s not too long.
defp fetch_and_save(%Committer{name: name} = committer) do
require Logger
case Committer.fetch_gravatar(committer, :png) do
{:ok, image} ->
File.write!("#{name}.png", image)
Logger.info "downloaded gravatar for #{name}"
{:error, reason} ->
Logger.error "failed to download gravatar for #{name}, because: #{inspect reason}"
end
end
Here we do all the dirty work - the actual fetching of the file, and saving it.
We also log all results, to give our users some feedback. Do you remember we’ve
used File.cd!/2
to go into our output directory? That’s why we can simply use
a filename when writing the images, and skip the directory part.
defp parse_args(args) do
case OptionParser.parse(args) do
{_, [repo, out], _} ->
{repo, out}
_ ->
IO.puts "Usage: download repository output_dir\n"
raise "Wrong arguments given to `download`"
end
end
Finally we have this simple function to parse the command line arguments. It’s dead simple, but there’s nothing more we should really do.
And the last part, we set everything in motion, by calling:
Download.run(System.argv)
Conclusion
We’ve shown that working with the outside commands in Elixir is really simple,
and that all it takes to parallelize operations is use the Task
module from the standard library. By using Stream
and Enum
we can easily
work with collections, either lazily or eagerly - as we’ve shown none is better -
each one has it’s uses. We can also use the built-in Erlang modules to offload
some work, because of Elixir’s great compatibility.
All of the code for this post can be found here