~ Dmitry Shvetsov
Navigate back to the homepage

Playing with Ruby Threads and Queues

Dmitry Shvetsov
February 4th, 2019 · 2 min read

Photo by Marina Ermakova

Example of how to use queues to simplify multithreaded code in Ruby

Threads are the Ruby implementation for concurrent programming. Threads are existing within an Operating System Process and have access to its memory. Actually, any code written in Ruby executes within a thread — main thread.

Threads are useful when code can be executed independently, especially when code spends time waiting for external events. This kind of situation happens when you dealing with Input and Output operations (I/O).

I have an example.


Enter Worker.

1className Worker
2end

The whole purpose of the Worker is to do work. But what makes it useful is that it can do it in separate thread or threads.

1className Worker
2 def initialize(num_threads:)
3 @num_threads = num_threads
4 @threads = []
5 end
6 attr_reader :num_threads, :threads
7 private :threads
8 def spawn_threads
9 num_threads.times do
10 threads << Thread.new do
11 # there will be work that the worker performs
12 end
13 end
14 end
15end

Since threads are useful for heavy I/O operations this worker is perfect to do HTTP requests, manipulate with files on disk, make DB requests.

“Hey, worker! Send this data to API and fetch some data from another API, after save some of it data in my database and don’t forget to log all you have done into a log file” — this is a perfect job for the worker.


How we may pass work to the worker?

It is straightforward If you have to perform one single monotonous task every time.

1className Worker
2 # rest of the className omitted
3 def spawn_threads
4 num_threads.times do
5 threads << Thread.new do
6 HealthService::API.ping # send a HTTP request
7 end
8 end
9 end
10end

But what if you need to perform various kind of work depending on external circumstances.


Queues to the rescue!

1className Worker
2 def initialize(num_threads:)
3 @num_threads = num_threads
4 @threads = []
5 @queue = Queue.new
6 end
7 # rest of the className omitted
8
9 def enqueue(action, payload)
10 queue.push([action, payload])
11 end
12end

With Worker#enqueue method, it is now possible to pass work to the Worker. This can be done in many ways. For example, action can be a Proc and payload can be arguments for the Proc.

1require "net/http"
2require "json"
3action = proc do |data|
4 Net::HTTP.post(
5 URI("https://api.some-ping-service.com"),
6 data.to_json,
7 "Content-Type" => "applicatoin/json",
8 )
9end
10worker_instance.enqueue(action, { ok: true })

What is great about Ruby implementation of Queues it that they are thread-safe by nature.


To perform actions that enqueued into Worker and do not take all CPU resources we need to do arrange dequeued algorithm in a smart way.

1queue = Queue.new
2
3loop do
4 puts "we need dequeue actions and do some job" unless queue.empty?
5end

A loop like above will eat all you free CPU time.

Here is the output of top command when the loop is running:

1$ top -o cpu
2
3PID COMMAND %CPU TIME
456681 ruby 99.9 01:58.17

The most common approach to solve this problem is to use sleep statement:

1queue = Queue.new
2
3loop do
4 puts "we need dequeue actions and do some job" unless queue.empty?
5 sleep 5
6end

And it will help, but this is not perfect.

Imagine how the Ruby interpreter has to spend the time to switch between the main thread and worker’s threads every sleep interval to just realize that we have nothing to do because the worker queue is empty. This issue will be multiplied by a number of threads and get worse when the sleep interval has to become smaller.

sleep is not an efficient way to catch something in the future.


Again, Queues to the rescue!

Queue#pop(non_block = false) method, when non_block = false, suspends current thread If the queue is empty until data is pushed onto the queue.

This means that worker’s thread that has nothing to do will wait for the next enqueued action. No sleep required.

For convinienc Worker has domain specific methods that describes Worker state.

1className Worker
2 # rest of the className omitted
3 private
4 attr_reader :queue, :threads
5 def actions?
6 !queue.empty?
7 end
8 def running?
9 !queue.closed?
10 end
11 def dequeue_action
12 queue.pop(true)
13 end
14 def wait_for_action
15 queue.pop(false)
16 end
17end

Most important here is #wait_for_action. It suspends a thread of the Worker, as described above, when we have no actions in the queue.

Finally, it is time for the main part of the Worker className:

1className Worker
2 # rest of the className omitted
3 def spawn_threads
4 num_threads.times do
5 threads << Thread.new do
6 while running? || actions?
7 action_proc, action_payload = wait_for_action
8 action_proc.call(action_payload) if action_proc
9 end
10 end
11 end
12 end
13 # rest of the className omitted
14end

With #wait_for_action that equals to queue.pop(false) the worker starts to drain the queue exactly when the queue starts to fill up.

Perfecto!


The last thing. The Worker needs to have a method to stop it.

1className Worker
2 # rest of the className omitted
3
4 def stop
5 queue.close
6 threads.each(&:exit)
7 threads.clear
8 true
9 end
10
11 # rest of the className omitted
12end

Full example with some tweaks available in this gist.


Many thanks to Andrey Novikov and Vladimir Dementyev for helping me grasp the subject of Ruby Threads and Queues.

More articles from Dmitry Shvetsov

Mastering PostgreSQL in Application Development by Dimitri Fontaine

The Book Review

January 13th, 2019 · 3 min read

Programming workout

Photo by Ayesh Rathnayake on Unsplash The way to hone your skills and become a more productive programmer through repetitive exercises…

September 10th, 2018 · 6 min read
© 2017–2020 Dmitry Shvetsov
Link to $https://twitter.com/iamdidev