Hello folks! This is my attempt in doing the infamous 1 billion row challenge in ruby. I've done it using Go and got just under 1 minute as my best (still a work in progress ๐ )
For those who don't know what the 1brc is; your given 1 billion temperature readings for different cities and you're required to calculate the min, max and average of each city respectively. Let's dive in
Serial Execution
Level 1 of this challenge is the following code below, very simple and to the point we store the min, max, count and sum of each city in a hash map, loop over the 1 billion rows line by line and start aggregating. Then we loop over the result printing every city and its results respectively.
data_struct = Struct::new(:max, :min, :sum, :count)
city_map = Hash::new { |hash, key| hash[key] = data_struct.new(0, 0, 0, 0) }
start_execution = Time.now
File.open('measurements.txt').each_line do |line|
city, temp = line.split(';')
temp = temp.to_f
city_map[city].sum += temp
city_map[city].count += 1
city_map[city].max = temp if temp > city_map[city].max
city_map[city].min = temp if temp < city_map[city].min
end
city_map.each do |city, data|
puts "%s: max: %.1f, min: %.1f, avg: %.1f" % [city, data.max, data.min, data.sum / data.count]
end
puts "Execution time: %.2f" % (Time.now - start_execution)
This came at a whooping 10 minutes and 51 seconds
which is what you expect really. Sequential and simple.
Now let's spice things up a bit
In ruby you can't really achieve parallelism by default due to the so called "GIL" which is the global interpreter lock and it makes sure only one thread runs at a given time. It's done to protect data structures From multiple thread access.
The downside of this is that we can't leverage multiple cores to achieve true parallelism which would be a game changer in decreasing the execution time. Which brings us to the second part of the challenge
Forking Processes for parallel execution
One workaround we can do is spin multiple processes and have them talk to each other. Where each process can run on a single core and you can have n processes where n is your machine core count.
That's exactly what we're going to do. But first let's identify where we want to use parallelism.
The main part slowing us down is the line by line parsing we're doing to the file. It's good for memory management because it only reads file by file but horrible for performance.
We need a way to read chunks (maybe like 50-60 megabytes) and send them to get processed. Reading in chunks will help us get to the end of the file way quicker.
Then each chunk could be processed separately by a running process. IF for example we have 8 cores and spun 8 processes we can process 8 read chunks in parallel. This would save a huge amount of time.
I had to change some things before moving on
We now have a process_chunk
function that's going to process chunks, create the hash map with the cities in that chunk. We'll spin up <core count> processes where each one will process separately.
def process_chunk(chunk)
city_map = Hash::new { |hash, key| hash[key] = { min: 0, max: 0, sum: 0, count: 0} }
chunk.each_line do |line|
city, temp = line.split(";")
temp = temp.to_f
city_map[city][:max] = temp if temp > city_map[city][:max]
city_map[city][:min] = temp if temp < city_map[city][:min]
city_map[city][:sum] += temp
city_map[city][:count] += 1
end
JSON.dump(city_map)
end
We'll use JSON for know but note that this could be a potential bottleneck.
Now we'll start processing the file in chunks. First we define a few constants
FILE_SIZE = File.size("measurements.txt")
CPU_COUNT = Etc.nprocessors
CHUNK_SIZE = (FILE_SIZE / CPU_COUNT) + 1
We split the chunks over the amount of cores we have, the more cores the faster this will perform.
pointer = 0
child_processes = Hash::new
chunk_count = 0
Then we define 3 variables, a pointer that points to we're we'll start reading from in each loop, a hash map of child processes to track them and a chunk counter to let us know when we're finished.
while pointer < FILE_SIZE && chunk_count < CPU_COUNT
# start reading from where pointer is located
chunk = IO.read("measurements.txt", CHUNK_SIZE, pointer)
break if chunk.nil?
chunk_count += 1 # increment the chunk count
# Sometimes we might end up in the middle of a line, so we track back
# to the last \n we find and take the chunk up till there.
# You can either go forward or move backward which ever suits you.
last_line_delimiter = chunk.rindex("\n") # Get last \n seen
processed_chunk = chunk[0..last_line_delimiter] # End chunk there
# To pass data between processes we'll need to use pipes. a pipe is a
# method used in Unix and Unix-like operating systems to send data
# from one program to another.
reader, writer = IO.pipe
# Now we start to fork and create new child processes for each chunk.
pid = fork do
reader.close # Close pipe reader in the child because we dont use it
city_hash = process_chunk(processed_chunk) # Process chunk
writer.write(city_hash + "\n") # Write to the pipe the data and a delimiter.
writer.close # Close the writer
end
writer.close # Close writer in MAIN process because we don't write to it
child_processes[pid] = reader # Add the reader and pid to the hashmap
pointer += last_line_delimiter + 1 # update the pointer
end
I tried explaining everything in comments above and briefly we're just reading chunks and assigning each chunk to a separate process for processing.
Now that that's over the main process now will wait for the data coming from any of the child processes and update the original CityMap
accordingly.
We'll use a handy os function called select(2)
What it does is it takes an array of readers, writers and blocks until one of them is available to either read or write from.
In our case we're only reading. So we'll pass in only an array of readers.
As soon as any reader is ready to read from, the main process will start reading in the data.
while ready = IO.select(child_processes.values)
# ready is an array of an array with the readers available.
ready[0].each do |reader|
serialized_city_map = ""
while line = reader.gets
serialized_city_map += line
end
city_map = JSON.load(serialized_city_map)
# The above reads until it reaches the delimiter we sent,
# And Deserializes the data
# Now we update our citymap
city_map.each do |city, data|
CityMap[city].max = [CityMap[city].max, data["max"]].max
CityMap[city].min = [CityMap[city].min, data["min"]].min
CityMap[city].sum += data["sum"]
CityMap[city].count += data["count"]
end
# Now we get the pid from the reader we have, delete it from the map
# And acknowledge that the child process has exited using Process.wait
pid = child_processes.key(reader)
child_processes.delete(pid) if pid
Process.wait(pid)
end
# Exit the loop if the hash is empty
break if child_processes.empty?
end
# Close the main process reader.
reader.close
This solution decreased our execution time by more than 50%.
It went from 10m51sec
to 2m56sec
!!
This approach uses around 9GB of memory in the main process and around 1GB in each of the child processes. It's a lot of memory being used.
Optimizations
Streaming the JSON to save memory and execution time
One of the first things I tried was streaming the json data. So instead of this
serialized_city_map = ""
while line = reader.gets
serialized_city_map += line
end
city_map = JSON.load(serialized_city_map)
Which loads the data and the json both in memory at the same time
I used this
city_map = nil
Json::Streamer.parser(file_io: reader).get(nesting_level:0) do |data|
city_map = data
end
This is a gem json/streamer
It takes in a reader object and processes the json accordingly. This would save us some memory but not a huge difference was made.
Enabling YJIT Compliler for Ruby 3.3+
Another tweak I made was to make sure I used YJIT
.
YJIT
stands for yet just in time, a compiler introduced first in ruby 3.0 and is faster and provides additional performance than the traditional JIT ruby had.
At first it was experimental but this article on Shopify suggests that its now production ready starting from Ruby 3.2.
This simple line enables it
RubyVM::YJIT.enable
Using YJIT
decreased the execution time to 2 minutes and 45 seconds.
Using binary serialization instead of JSON
Another optimization is get rid of the json all together. And use a binary serialization format which would be less of size than JSON. Definitely would decrease execution time.
I used a gem called msgpack
for this. It serializes data directly to binary.
Now we change the JSON implementations
def process_chunk(chunk)
city_map = Hash::new { |hash, key| hash[key] = { min: 0, max: 0, sum: 0, count: 0} }
chunk.each_line do |line|
city, temp = line.split(";")
temp = temp.to_f
city_map[city][:max] = temp if temp > city_map[city][:max]
city_map[city][:min] = temp if temp < city_map[city][:min]
city_map[city][:sum] += temp
city_map[city][:count] += 1
end
city_map.to_msgpack
end
while ready = IO.select(child_processes.values)
ready[0].each do |reader|
city_map = nil
# Still streaming the data too!
u = MessagePack::Unpacker.new(reader)
u.each { |obj| city_map = obj }
###############################
city_map.each do |city, data|
CityMap[city].max = [CityMap[city].max, data["max"]].max
CityMap[city].min = [CityMap[city].min, data["min"]].min
CityMap[city].sum += data["sum"]
CityMap[city].count += data["count"]
end
pid = child_processes.key(reader)
child_processes.delete(pid) if pid
Process.wait(pid)
end
break if child_processes.empty?
end
This made the execution time 2 minutes and 40 seconds. This is the best I could now for now. However there's going to be a part 2 hopefully with way less execution times. If you guys have any tips or thoughts on what might give a performance boost let me know! some things in mind are:
Find a way to leverage Ractors which could allow multiple thread execution in parallel. That would maybe decrease execution time since process starting is costly.
Yea that's it for now :D
Hope you enjoyed this! I have another implementation in Go too the repo link is here:
https://github.com/amrelhewy09/1brc
feel free to fork and do your own thing.
And as usual see you in the next one!