Simplify Asynchronous Control Flow in Javascript - Part 2

In the coming weeks, Rakit will be offering trainings on various development courses, including fundamental topics on languages like Javascript. We'll be writing series of blog posts to complement the trainings, such as this one. We are developers ourselves, actively engaging in various projects. We will share our techniques and tools based on our experience.

Hi again, this is part 2 of a series on Javascript fundamentals. If you haven’t already, check out the first part. As I promised in the first part, I am going to refactor fetchEachSynopsis() to work in parallel. For that, I will introduce a worker queue, to manage the execution of individual calls (tasks) to movieApi.fetchSynopsis(). At its most basic, the queue constructor will accept a function to process individual task, and the queue instance will accept inputs (in our case, the movie titles) for the task.

As the client of the queue will need to know when all tasks in the queue have been processed (also refered to as drained), the queue constructor should also accept a callback for that. Also, to make it more configurable, it should also accept an integer value for concurrency level.

You would then use the queue, to push() movie titles, and it will execute the tasks, according to the concurrency value you define when constructing the queue. The queue constructor will have this definition:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function createQueue (taskCb, drainCb, concurrency) {}

// where `taskCb` signature:
function taskCb (data, taskDoneCb) {
  // Call taskDoneCb() once done with the task with no argument,
  // to inform the queue it can now process the next item
}

// and `drainCb` signature:
function drainCb () {}

// Example usage:
var queue = createQueue(taskCb, drainCb, 2)
queue.push(titleA)
queue.push(titleB)

And the current fetchEachSynopsis() (from part #1) looks like this:

1
2
3
4
5
6
7
8
9
function fetchEachSynopsis (titles, synopsisCb, doneCb) {
  var title = titles.shift()
  if (!title) return doneCb()

  movieApi.fetchSynopsis(title, function (err, synopsis) {
    synopsisCb(err, title, synopsis)
    setTimeout(fetchEachSynopsis.bind(null, titles, synopsisCb, doneCb), 0)
  })
}

Assuming the above queue constructor, I change it to the following:

1
2
3
4
5
6
7
8
9
10
11
function fetchEachSynopsis (titles, synopsisCb, doneCb) {
  function task (title, taskDoneCb) {
    movieApi.fetchSynopsis(title, function (err, synopsis) {
      synopsisCb(err, title, synopsis)
      taskDoneCb()
    })
  }
  var queue = createQueue(task, doneCb, 2)

  titles.forEach(function (title) { queue.push(title) })
}

The function is slighly longer now, with a few changes:

  1. movieApi.fetchSynopsis() call is wrapped in task() function, with taskDoneCb() called in the callback.
  2. No more recursive call to fetchEachSynopsis().
  3. A queue is created to process the task, with all titles immediately pushed on the next line.

Worker Queue

Now, let’s implement the createQueue constructor function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
function createQueue (taskCb, drainCb, concurrency) {
  var processing = 0
  var queue = []
  concurrency = concurrency || 1

  function log (msg) { notify('[queue] ' + msg) }
  function drained () { return !processing && !queue.length }

  function process () {
    if (drained()) {
      log('drained')
      return drainCb()
    }

    if (processing < concurrency) {
      var data = queue.pop()
      if (!data) return

      processing++
      log('processing: ' + processing + ', queued: ' + queue.length)
      taskCb(data, function () {
        setTimeout(function () {
          processing--
          process()
        }, 0)
      })
    }
  }

  this.push = function (data) {
    queue.push(data)
    process()
  }

  return this
}

It is a very basic implementation of worker queue, to demonstrate simple parallel processing in Javascript. The internal queue itself is backed by an array (line 3), to maintain a list of data it needs to process. The worker queue is started when push() is called (line 30). The drained() check (line 7) is very simple - the queue can be at this state multiple times. For example, when pushing into the queue asynchronously.

Now if you execute the application, the output should be something like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Fetching latest titles
[queue] processing: 1, queued: 0
[queue] processing: 2, queued: 0
Fetched synopsis VI – Return of the Jedi (1983) jeng jeng jeng jeng
Fetched synopsis V – The Empire Strikes Back (1980) jeng jeng jeng jeng
[queue] processing: 2, queued: 4
[queue] processing: 2, queued: 3
Fetched synopsis VII – The Force Awakens (2015) jeng jeng jeng jeng
[queue] processing: 2, queued: 2
Fetched synopsis III – Revenge of the Sith (2005) jeng jeng jeng jeng
[queue] processing: 2, queued: 1
Fetched synopsis II – Attack of the Clones (2002) jeng jeng jeng jeng
[queue] processing: 2, queued: 0
Fetched synopsis I – The Phantom Menace (1999) jeng jeng jeng jeng
Fetched synopsis IV – A New Hope (1977) jeng jeng jeng jeng
[queue] drained
Done fetching all synopsis, error: none

Where you can see, two synopsis are fetched concurrently at any time, while rest waiting in the queue. Pay attention to line 8-9, where the queue immediately takes another input to process once a task is completed.

I’m pretty sure you could imagine using queue for this sort of task through out your application, so let me introduce to Async utility library. It has tons of other control flow utilities, apart from queue. As an example, let’s replace the fetchEachSynopsis() to use async.queue instead:

1
2
3
4
5
6
7
8
9
10
11
function fetchEachSynopsis (titles, synopsisCb, doneCb) {
  var queue = async.queue(function (title, taskDoneCb) {
    movieApi.fetchSynopsis(title, function (err, synopsis) {
      synopsisCb(err, title, synopsis)
      taskDoneCb()
    })
  }, 2)
  queue.drain = doneCb

  titles.forEach(function (title) { queue.push(title) })
}

Apart from assigning doneCb() to queue.drain instead of passing it to the constructor, there isn’t really much change here. You can see the async.queue implementation. It can do more than my naive implementation, such as pausing and stopping the queue. Be sure to check other control flows as well, and try changing the code use them. For example async.each*() methods seem like good fit.

I think in the next part, we can take a look at Javascript Promise object, which can be also used manage for asynchronous workflow.

Full code & stub