1 billion row challenge in ... ruby

1 billion row challenge in ... ruby

ยท

8 min read

Hello folks! This is my attempt in doing the infamous 1 billion row challenge in ruby. I've done it using Go and got just under 1 minute as my best (still a work in progress ๐Ÿ˜…)

For those who don't know what the 1brc is; your given 1 billion temperature readings for different cities and you're required to calculate the min, max and average of each city respectively. Let's dive in

Serial Execution

Level 1 of this challenge is the following code below, very simple and to the point we store the min, max, count and sum of each city in a hash map, loop over the 1 billion rows line by line and start aggregating. Then we loop over the result printing every city and its results respectively.


data_struct = Struct::new(:max, :min, :sum, :count)
city_map = Hash::new { |hash, key| hash[key] = data_struct.new(0, 0, 0, 0) }
start_execution = Time.now

File.open('measurements.txt').each_line do |line|
  city, temp = line.split(';')
  temp = temp.to_f
  city_map[city].sum += temp
  city_map[city].count += 1
  city_map[city].max = temp if temp > city_map[city].max
  city_map[city].min = temp if temp < city_map[city].min
end


city_map.each do |city, data|
  puts "%s: max: %.1f, min: %.1f, avg: %.1f" % [city, data.max, data.min, data.sum / data.count]
end

puts "Execution time: %.2f" % (Time.now - start_execution)

This came at a whooping 10 minutes and 51 seconds which is what you expect really. Sequential and simple.

Now let's spice things up a bit

In ruby you can't really achieve parallelism by default due to the so called "GIL" which is the global interpreter lock and it makes sure only one thread runs at a given time. It's done to protect data structures From multiple thread access.

The downside of this is that we can't leverage multiple cores to achieve true parallelism which would be a game changer in decreasing the execution time. Which brings us to the second part of the challenge

Forking Processes for parallel execution

One workaround we can do is spin multiple processes and have them talk to each other. Where each process can run on a single core and you can have n processes where n is your machine core count.

That's exactly what we're going to do. But first let's identify where we want to use parallelism.

The main part slowing us down is the line by line parsing we're doing to the file. It's good for memory management because it only reads file by file but horrible for performance.

We need a way to read chunks (maybe like 50-60 megabytes) and send them to get processed. Reading in chunks will help us get to the end of the file way quicker.

Then each chunk could be processed separately by a running process. IF for example we have 8 cores and spun 8 processes we can process 8 read chunks in parallel. This would save a huge amount of time.

I had to change some things before moving on

We now have a process_chunk function that's going to process chunks, create the hash map with the cities in that chunk. We'll spin up <core count> processes where each one will process separately.

def process_chunk(chunk)
  city_map = Hash::new { |hash, key| hash[key] = { min: 0, max: 0, sum: 0, count: 0} }
  chunk.each_line do |line|
    city, temp = line.split(";")
    temp = temp.to_f
    city_map[city][:max] = temp if temp > city_map[city][:max]
    city_map[city][:min] = temp if temp < city_map[city][:min]
    city_map[city][:sum] += temp
    city_map[city][:count] += 1
  end
  JSON.dump(city_map)
end

We'll use JSON for know but note that this could be a potential bottleneck.

Now we'll start processing the file in chunks. First we define a few constants

FILE_SIZE = File.size("measurements.txt")
CPU_COUNT = Etc.nprocessors
CHUNK_SIZE = (FILE_SIZE / CPU_COUNT) + 1

We split the chunks over the amount of cores we have, the more cores the faster this will perform.

pointer = 0
child_processes = Hash::new
chunk_count = 0

Then we define 3 variables, a pointer that points to we're we'll start reading from in each loop, a hash map of child processes to track them and a chunk counter to let us know when we're finished.

while pointer < FILE_SIZE && chunk_count < CPU_COUNT
# start reading from where pointer is located
  chunk = IO.read("measurements.txt", CHUNK_SIZE, pointer)
  break if chunk.nil?
  chunk_count += 1 # increment the chunk count

# Sometimes we might end up in the middle of a line, so we track back
# to the last \n we find and take the chunk up till there.
# You can either go forward or move backward which ever suits you.
  last_line_delimiter = chunk.rindex("\n") # Get last \n seen
  processed_chunk = chunk[0..last_line_delimiter] # End chunk there
# To pass data between processes we'll need to use pipes. a pipe is a 
# method used in Unix and Unix-like operating systems to send data 
# from one program to another.
  reader, writer = IO.pipe
# Now we start to fork and create new child processes for each chunk.
  pid = fork do
      reader.close # Close pipe reader in the child because we dont use it
      city_hash = process_chunk(processed_chunk) # Process chunk
      writer.write(city_hash + "\n") # Write to the pipe the data and a delimiter.
      writer.close # Close the writer
    end
  writer.close # Close writer in MAIN process because we don't write to it
  child_processes[pid] = reader # Add the reader and pid to the hashmap
  pointer += last_line_delimiter + 1 # update the pointer
end

I tried explaining everything in comments above and briefly we're just reading chunks and assigning each chunk to a separate process for processing.

Now that that's over the main process now will wait for the data coming from any of the child processes and update the original CityMap accordingly.

We'll use a handy os function called select(2)

What it does is it takes an array of readers, writers and blocks until one of them is available to either read or write from.

In our case we're only reading. So we'll pass in only an array of readers.

As soon as any reader is ready to read from, the main process will start reading in the data.

while ready = IO.select(child_processes.values)
# ready is an array of an array with the readers available.
  ready[0].each do |reader|
    serialized_city_map = ""
    while line = reader.gets
      serialized_city_map += line
    end
    city_map = JSON.load(serialized_city_map)
# The above reads until it reaches the delimiter we sent, 
# And Deserializes the data
# Now we update our citymap
    city_map.each do |city, data|
      CityMap[city].max = [CityMap[city].max, data["max"]].max
      CityMap[city].min = [CityMap[city].min, data["min"]].min
      CityMap[city].sum += data["sum"]
      CityMap[city].count += data["count"]
    end
# Now we get the pid from the reader we have, delete it from the map
# And acknowledge that the child process has exited using Process.wait
    pid = child_processes.key(reader)
    child_processes.delete(pid) if pid
    Process.wait(pid)
  end
# Exit the loop if the hash is empty
  break if child_processes.empty?
end
# Close the main process reader.
reader.close

This solution decreased our execution time by more than 50%.

It went from 10m51sec to 2m56sec !!

This approach uses around 9GB of memory in the main process and around 1GB in each of the child processes. It's a lot of memory being used.

Optimizations

Streaming the JSON to save memory and execution time

One of the first things I tried was streaming the json data. So instead of this

    serialized_city_map = ""
    while line = reader.gets
      serialized_city_map += line
    end
    city_map = JSON.load(serialized_city_map)

Which loads the data and the json both in memory at the same time

I used this

    city_map = nil
    Json::Streamer.parser(file_io: reader).get(nesting_level:0) do |data|
      city_map = data
    end

This is a gem json/streamer It takes in a reader object and processes the json accordingly. This would save us some memory but not a huge difference was made.

Enabling YJIT Compliler for Ruby 3.3+

Another tweak I made was to make sure I used YJIT.

YJIT stands for yet just in time, a compiler introduced first in ruby 3.0 and is faster and provides additional performance than the traditional JIT ruby had.

At first it was experimental but this article on Shopify suggests that its now production ready starting from Ruby 3.2.

This simple line enables it

 RubyVM::YJIT.enable

Using YJIT decreased the execution time to 2 minutes and 45 seconds.

Using binary serialization instead of JSON

Another optimization is get rid of the json all together. And use a binary serialization format which would be less of size than JSON. Definitely would decrease execution time.

I used a gem called msgpack for this. It serializes data directly to binary.

Now we change the JSON implementations

def process_chunk(chunk)
  city_map = Hash::new { |hash, key| hash[key] = { min: 0, max: 0, sum: 0, count: 0} }
  chunk.each_line do |line|
    city, temp = line.split(";")
    temp = temp.to_f
    city_map[city][:max] = temp if temp > city_map[city][:max]
    city_map[city][:min] = temp if temp < city_map[city][:min]
    city_map[city][:sum] += temp
    city_map[city][:count] += 1
  end
  city_map.to_msgpack
end
while ready = IO.select(child_processes.values)
  ready[0].each do |reader|
    city_map = nil
    # Still streaming the data too!
    u = MessagePack::Unpacker.new(reader)
    u.each { |obj| city_map = obj }
    ###############################
    city_map.each do |city, data|
      CityMap[city].max = [CityMap[city].max, data["max"]].max
      CityMap[city].min = [CityMap[city].min, data["min"]].min
      CityMap[city].sum += data["sum"]
      CityMap[city].count += data["count"]
    end
    pid = child_processes.key(reader)
    child_processes.delete(pid) if pid
    Process.wait(pid)
  end
  break if child_processes.empty?
end

This made the execution time 2 minutes and 40 seconds. This is the best I could now for now. However there's going to be a part 2 hopefully with way less execution times. If you guys have any tips or thoughts on what might give a performance boost let me know! some things in mind are:

  1. Find a way to leverage Ractors which could allow multiple thread execution in parallel. That would maybe decrease execution time since process starting is costly.

  2. Yea that's it for now :D

Hope you enjoyed this! I have another implementation in Go too the repo link is here:

https://github.com/amrelhewy09/1brc

feel free to fork and do your own thing.

And as usual see you in the next one!

Did you find this article valuable?

Support Amr Elhewy by becoming a sponsor. Any amount is appreciated!

ย