Implement row based multi-threading of encoder

Row based multi-threading of encoder has been implemented.

When tested for 20 frames of BasketballDrive_1920x1080_50 content,
with --cpu-used=1 --tile-columns=2 --tile-rows=2, the following
scaling was acheived for 4 threads:

Tile-based:             Row based:
 2.566                   3.331

~23% encode time reduction was seen for row based multi-threading
of encoder when compared with tile-based multi-threading for the
above configuration.

Change-Id: I1e3d431ffcd3d0d398fe6c3aa34755e4d8c5332d
diff --git a/av1/encoder/encoder.c b/av1/encoder/encoder.c
index e94b832..53c60a7 100644
--- a/av1/encoder/encoder.c
+++ b/av1/encoder/encoder.c
@@ -3058,6 +3058,14 @@
       aom_free(thread_data->td);
     }
   }
+#if CONFIG_MULTITHREAD
+  if (cpi->row_mt == 1) {
+    if (cpi->row_mt_mutex_ != NULL) {
+      pthread_mutex_destroy(cpi->row_mt_mutex_);
+      aom_free(cpi->row_mt_mutex_);
+    }
+  }
+#endif
   av1_row_mt_mem_dealloc(cpi);
   aom_free(cpi->tile_thr_data);
   aom_free(cpi->workers);
diff --git a/av1/encoder/encoder.h b/av1/encoder/encoder.h
index f58a4a1..682b424 100644
--- a/av1/encoder/encoder.h
+++ b/av1/encoder/encoder.h
@@ -482,6 +482,11 @@
   int rows;
 } AV1RowMTSync;
 
+typedef struct AV1RowMTInfo {
+  int current_mi_row;
+  int num_threads_working;
+} AV1RowMTInfo;
+
 // TODO(jingning) All spatially adaptive variables should go to TileDataEnc.
 typedef struct TileDataEnc {
   TileInfo tile_info;
@@ -497,6 +502,7 @@
   InterModeRdModel inter_mode_rd_models[BLOCK_SIZES_ALL];
 #endif
   AV1RowMTSync row_mt_sync;
+  AV1RowMTInfo row_mt_info;
 } TileDataEnc;
 
 typedef struct {
@@ -509,6 +515,7 @@
   int allocated_tile_rows;
   int allocated_tile_cols;
   int allocated_sb_rows;
+  int thread_id_to_tile_id[MAX_NUM_THREADS];  // Mapping of threads to tiles
 } MultiThreadHandle;
 
 typedef struct RD_COUNTS {
@@ -821,6 +828,9 @@
   MultiThreadHandle multi_thread_ctxt;
   void (*row_mt_sync_read_ptr)(AV1RowMTSync *const, int, int);
   void (*row_mt_sync_write_ptr)(AV1RowMTSync *const, int, int, const int);
+#if CONFIG_MULTITHREAD
+  pthread_mutex_t *row_mt_mutex_;
+#endif
 } AV1_COMP;
 
 // Must not be called more than once.
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c
index 89f75b0..ab4687a 100644
--- a/av1/encoder/ethread.c
+++ b/av1/encoder/ethread.c
@@ -160,23 +160,118 @@
   }
 }
 
+void assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt, int num_tiles,
+                           int num_workers) {
+  int tile_id = 0;
+  int i;
+
+  for (i = 0; i < num_workers; i++) {
+    multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++;
+    if (tile_id == num_tiles) tile_id = 0;
+  }
+}
+
+static int get_next_job(AV1_COMP *const cpi, int *current_mi_row,
+                        int cur_tile_id) {
+  AV1_COMMON *const cm = &cpi->common;
+  TileDataEnc *const this_tile = &cpi->tile_data[cur_tile_id];
+  AV1RowMTInfo *row_mt_info = &this_tile->row_mt_info;
+
+  if (row_mt_info->current_mi_row < this_tile->tile_info.mi_row_end) {
+    *current_mi_row = row_mt_info->current_mi_row;
+    row_mt_info->num_threads_working++;
+    row_mt_info->current_mi_row += cm->seq_params.mib_size;
+    return 1;
+  }
+  return 0;
+}
+
+static void switch_tile_and_get_next_job(AV1_COMP *const cpi, int *cur_tile_id,
+                                         int *current_mi_row,
+                                         int *end_of_frame) {
+  AV1_COMMON *const cm = &cpi->common;
+  const int tile_cols = cm->tile_cols;
+  const int tile_rows = cm->tile_rows;
+
+  int tile_id = -1;  // Stores the tile ID with minimum proc done
+  int max_mis_to_encode = 0;
+  int min_num_threads_working = INT_MAX;
+
+  for (int tile_row = 0; tile_row < tile_rows; tile_row++) {
+    for (int tile_col = 0; tile_col < tile_cols; tile_col++) {
+      int tile_index = tile_row * tile_cols + tile_col;
+      TileDataEnc *this_tile = &cpi->tile_data[tile_index];
+      AV1RowMTInfo *row_mt_info = &this_tile->row_mt_info;
+      int num_mis_to_encode =
+          this_tile->tile_info.mi_row_end - row_mt_info->current_mi_row;
+
+      // Tile to be processed by this thread is selected on the basis of
+      // availability of jobs:
+      // 1) If jobs are available, tile to be processed is chosen on the
+      // basis of minimum number of threads working for that tile. If two or
+      // more tiles have same number of threads working for them, then the tile
+      // with maximum number of jobs available will be chosen.
+      // 2) If no jobs are available, then end_of_frame is reached.
+      if (num_mis_to_encode > 0) {
+        int num_threads_working = row_mt_info->num_threads_working;
+        if (num_threads_working < min_num_threads_working) {
+          min_num_threads_working = num_threads_working;
+          max_mis_to_encode = 0;
+        }
+        if (num_threads_working == min_num_threads_working &&
+            num_mis_to_encode > max_mis_to_encode) {
+          tile_id = tile_index;
+          max_mis_to_encode = num_mis_to_encode;
+        }
+      }
+    }
+  }
+  if (tile_id == -1) {
+    *end_of_frame = 1;
+  } else {
+    // Update the cur ID to the next tile ID that will be processed,
+    // which will be the least processed tile
+    *cur_tile_id = tile_id;
+    get_next_job(cpi, current_mi_row, *cur_tile_id);
+  }
+}
+
 static int enc_row_mt_worker_hook(void *arg1, void *unused) {
   EncWorkerData *const thread_data = (EncWorkerData *)arg1;
   AV1_COMP *const cpi = thread_data->cpi;
   AV1_COMMON *const cm = &cpi->common;
-  const int tile_cols = cm->tile_cols;
-  const int tile_rows = cm->tile_rows;
-  int t;
 
+  MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
+  int thread_id = thread_data->thread_id;
+  int cur_tile_id = multi_thread_ctxt->thread_id_to_tile_id[thread_id];
   (void)unused;
 
-  for (t = thread_data->start; t < tile_rows * tile_cols;
-       t += cpi->num_workers) {
-    int tile_row = t / tile_cols;
-    int tile_col = t % tile_cols;
+  assert(cur_tile_id != -1);
 
-    TileDataEnc *const this_tile =
-        &cpi->tile_data[tile_row * cm->tile_cols + tile_col];
+  int end_of_frame = 0;
+  while (1) {
+    int current_mi_row = -1;
+#if CONFIG_MULTITHREAD
+    pthread_mutex_lock(cpi->row_mt_mutex_);
+#endif
+    if (!get_next_job(cpi, &current_mi_row, cur_tile_id)) {
+      // No jobs are available for the current tile. Query for the status of
+      // other tiles and get the next job if available
+      switch_tile_and_get_next_job(cpi, &cur_tile_id, &current_mi_row,
+                                   &end_of_frame);
+    }
+#if CONFIG_MULTITHREAD
+    pthread_mutex_unlock(cpi->row_mt_mutex_);
+#endif
+    if (end_of_frame == 1) break;
+
+    TileDataEnc *const this_tile = &cpi->tile_data[cur_tile_id];
+    int tile_row = this_tile->tile_info.tile_row;
+    int tile_col = this_tile->tile_info.tile_col;
+
+    assert(current_mi_row != -1 &&
+           current_mi_row <= this_tile->tile_info.mi_row_end);
+
     ThreadData *td = thread_data->td;
 
     td->mb.e_mbd.tile_ctx = td->tctx;
@@ -193,11 +288,14 @@
     av1_crc32c_calculator_init(&td->mb.mb_rd_record.crc_calculator);
     td->intrabc_used_this_tile = 0;
 
-    for (int mi_row = this_tile->tile_info.mi_row_start;
-         mi_row < this_tile->tile_info.mi_row_end;
-         mi_row += cm->seq_params.mib_size) {
-      av1_encode_sb_row(cpi, td, tile_row, tile_col, mi_row);
-    }
+    av1_encode_sb_row(cpi, td, tile_row, tile_col, current_mi_row);
+#if CONFIG_MULTITHREAD
+    pthread_mutex_lock(cpi->row_mt_mutex_);
+#endif
+    this_tile->row_mt_info.num_threads_working--;
+#if CONFIG_MULTITHREAD
+    pthread_mutex_unlock(cpi->row_mt_mutex_);
+#endif
   }
 
   return 1;
@@ -239,6 +337,16 @@
   CHECK_MEM_ERROR(cm, cpi->tile_thr_data,
                   aom_calloc(num_workers, sizeof(*cpi->tile_thr_data)));
 
+#if CONFIG_MULTITHREAD
+  if (cpi->row_mt == 1) {
+    if (cpi->row_mt_mutex_ == NULL) {
+      CHECK_MEM_ERROR(cm, cpi->row_mt_mutex_,
+                      aom_malloc(sizeof(*(cpi->row_mt_mutex_))));
+      if (cpi->row_mt_mutex_) pthread_mutex_init(cpi->row_mt_mutex_, NULL);
+    }
+  }
+#endif
+
   for (int i = 0; i < num_workers; i++) {
     AVxWorker *const worker = &cpi->workers[i];
     EncWorkerData *const thread_data = &cpi->tile_thr_data[i];
@@ -248,6 +356,7 @@
     worker->thread_name = "aom enc worker";
 
     thread_data->cpi = cpi;
+    thread_data->thread_id = i;
 
     if (i < num_workers - 1) {
       // Allocate thread data.
@@ -460,7 +569,7 @@
   const int tile_cols = cm->tile_cols;
   const int tile_rows = cm->tile_rows;
   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
-  int num_workers = AOMMIN(cpi->oxcf.max_threads, tile_cols * tile_rows);
+  int num_workers = cpi->oxcf.max_threads;
   int max_sb_rows = 0;
 
   if (cpi->tile_data == NULL || cpi->allocated_tiles < tile_cols * tile_rows) {
@@ -485,13 +594,20 @@
     av1_row_mt_mem_alloc(cpi, max_sb_rows);
   }
 
+  memset(multi_thread_ctxt->thread_id_to_tile_id, -1,
+         sizeof(*multi_thread_ctxt->thread_id_to_tile_id) * MAX_NUM_THREADS);
+
   for (int tile_row = 0; tile_row < tile_rows; tile_row++) {
     for (int tile_col = 0; tile_col < tile_cols; tile_col++) {
-      TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col];
+      int tile_id = tile_row * tile_cols + tile_col;
+      TileDataEnc *this_tile = &cpi->tile_data[tile_id];
 
       // Initialize cur_col to -1 for all rows.
       memset(this_tile->row_mt_sync.cur_col, -1,
              sizeof(*this_tile->row_mt_sync.cur_col) * max_sb_rows);
+      this_tile->row_mt_info.current_mi_row = this_tile->tile_info.mi_row_start;
+      this_tile->row_mt_info.num_threads_working = 0;
+
 #if CONFIG_COLLECT_INTER_MODE_RD_STATS
       av1_inter_mode_data_init(this_tile);
 #endif
@@ -509,6 +625,7 @@
   } else {
     num_workers = AOMMIN(num_workers, cpi->num_workers);
   }
+  assign_tile_to_thread(multi_thread_ctxt, tile_cols * tile_rows, num_workers);
   prepare_enc_workers(cpi, enc_row_mt_worker_hook, num_workers);
   launch_enc_workers(cpi, num_workers);
   sync_enc_workers(cpi, num_workers);
diff --git a/av1/encoder/ethread.h b/av1/encoder/ethread.h
index 853acb1..1830759 100644
--- a/av1/encoder/ethread.h
+++ b/av1/encoder/ethread.h
@@ -24,6 +24,7 @@
   struct AV1_COMP *cpi;
   struct ThreadData *td;
   int start;
+  int thread_id;
 } EncWorkerData;
 
 void av1_row_mt_sync_read(AV1RowMTSync *const row_mt_sync, int r, int c);