Implement row based multithreading of temporal filter

This CL adds support for row based multithreading of temporal
filter module.

cpu-used    Resolution    Tile    Average Encode Time
                                     Reduction (%)
    1        832x480      2x1       0.59 (2 threads)
    2        832x480      2x1       0.82 (2 threads)
    3       1280x720      2x2       8.50 (4 threads)
    4       1920x1080     4x2      11.89 (8 threads)
    5       3840x2160     4x2      24.62 (8 threads)
    6       3840x2160     4x2      25.88 (8 threads)

Change-Id: I9ef91774d71d1a811c5ad0cc447ab5f72a135352
diff --git a/av1/encoder/encoder.c b/av1/encoder/encoder.c
index 7b3b6ae..41507e7 100644
--- a/av1/encoder/encoder.c
+++ b/av1/encoder/encoder.c
@@ -1544,6 +1544,7 @@
 #if !CONFIG_REALTIME_ONLY
     av1_loop_restoration_dealloc(&mt_info->lr_row_sync, mt_info->num_workers);
     av1_gm_dealloc(&mt_info->gm_sync);
+    av1_tf_mt_dealloc(&mt_info->tf_sync);
 #endif
   }
 
diff --git a/av1/encoder/encoder.h b/av1/encoder/encoder.h
index d9dbc30..8cfee7c 100644
--- a/av1/encoder/encoder.h
+++ b/av1/encoder/encoder.h
@@ -1394,6 +1394,11 @@
    * Global Motion multi-threading object.
    */
   AV1GlobalMotionSync gm_sync;
+
+  /*!
+   * Temporal Filter multi-threading object.
+   */
+  AV1TemporalFilterSync tf_sync;
 } MultiThreadInfo;
 
 /*!\cond */
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c
index bd7943d..0617c51 100644
--- a/av1/encoder/ethread.c
+++ b/av1/encoder/ethread.c
@@ -22,6 +22,7 @@
 #include "av1/encoder/global_motion_facade.h"
 #include "av1/encoder/rdopt.h"
 #include "aom_dsp/aom_dsp_common.h"
+#include "av1/encoder/temporal_filter.h"
 #include "av1/encoder/tpl_model.h"
 
 static AOM_INLINE void accumulate_rd_opt(ThreadData *td, ThreadData *td_t) {
@@ -539,6 +540,11 @@
                     aom_malloc(sizeof(*(gm_sync->mutex_))));
     if (gm_sync->mutex_) pthread_mutex_init(gm_sync->mutex_, NULL);
   }
+  AV1TemporalFilterSync *tf_sync = &mt_info->tf_sync;
+  if (tf_sync->mutex_ == NULL) {
+    CHECK_MEM_ERROR(cm, tf_sync->mutex_, aom_malloc(sizeof(*tf_sync->mutex_)));
+    if (tf_sync->mutex_) pthread_mutex_init(tf_sync->mutex_, NULL);
+  }
 #endif
 
   for (int i = num_workers - 1; i >= 0; i--) {
@@ -1305,6 +1311,139 @@
   sync_enc_workers(&cpi->mt_info, cm, num_workers);
 }
 
+// Deallocate memory for temporal filter multi-thread synchronization.
+void av1_tf_mt_dealloc(AV1TemporalFilterSync *tf_sync) {
+  assert(tf_sync != NULL);
+#if CONFIG_MULTITHREAD
+  if (tf_sync->mutex_ != NULL) {
+    pthread_mutex_destroy(tf_sync->mutex_);
+    aom_free(tf_sync->mutex_);
+  }
+#endif  // CONFIG_MULTITHREAD
+  tf_sync->next_tf_row = 0;
+}
+
+// Checks if a job is available. If job is available,
+// populates next_tf_row and returns 1, else returns 0.
+static AOM_INLINE int tf_get_next_job(AV1TemporalFilterSync *tf_mt_sync,
+                                      int *current_mb_row, int mb_rows) {
+  int do_next_row = 0;
+#if CONFIG_MULTITHREAD
+  pthread_mutex_t *tf_mutex_ = tf_mt_sync->mutex_;
+  pthread_mutex_lock(tf_mutex_);
+#endif
+  if (tf_mt_sync->next_tf_row < mb_rows) {
+    *current_mb_row = tf_mt_sync->next_tf_row;
+    tf_mt_sync->next_tf_row++;
+    do_next_row = 1;
+  }
+#if CONFIG_MULTITHREAD
+  pthread_mutex_unlock(tf_mutex_);
+#endif
+  return do_next_row;
+}
+
+// Hook function for each thread in temporal filter multi-threading.
+static int tf_worker_hook(void *arg1, void *unused) {
+  (void)unused;
+  EncWorkerData *thread_data = (EncWorkerData *)arg1;
+  AV1_COMP *cpi = thread_data->cpi;
+  ThreadData *td = thread_data->td;
+  TemporalFilterCtx *tf_ctx = &cpi->tf_ctx;
+  AV1TemporalFilterSync *tf_sync = &cpi->mt_info.tf_sync;
+  const struct scale_factors *scale = &cpi->tf_ctx.sf;
+  const int num_planes = av1_num_planes(&cpi->common);
+  assert(num_planes >= 1 && num_planes <= MAX_MB_PLANE);
+
+  MACROBLOCKD *mbd = &td->mb.e_mbd;
+  uint8_t *input_buffer[MAX_MB_PLANE];
+  MB_MODE_INFO **input_mb_mode_info;
+  tf_save_state(mbd, &input_mb_mode_info, input_buffer, num_planes);
+  tf_setup_macroblockd(mbd, &td->tf_data, scale);
+
+  int current_mb_row = -1;
+
+  while (tf_get_next_job(tf_sync, &current_mb_row, tf_ctx->mb_rows))
+    av1_tf_do_filtering_row(cpi, td, current_mb_row);
+
+  tf_restore_state(mbd, input_mb_mode_info, input_buffer, num_planes);
+
+  return 1;
+}
+
+// Assigns temporal filter hook function and thread data to each worker.
+static void prepare_tf_workers(AV1_COMP *cpi, AVxWorkerHook hook,
+                               int num_workers, int is_highbitdepth) {
+  MultiThreadInfo *mt_info = &cpi->mt_info;
+  mt_info->tf_sync.next_tf_row = 0;
+  for (int i = num_workers - 1; i >= 0; i--) {
+    AVxWorker *worker = &mt_info->workers[i];
+    EncWorkerData *thread_data = &mt_info->tile_thr_data[i];
+
+    worker->hook = hook;
+    worker->data1 = thread_data;
+    worker->data2 = NULL;
+
+    thread_data->cpi = cpi;
+    if (i == 0) {
+      thread_data->td = &cpi->td;
+    }
+
+    // Before encoding a frame, copy the thread data from cpi.
+    if (thread_data->td != &cpi->td) {
+      thread_data->td->mb = cpi->td.mb;
+      thread_data->td->mb.obmc_buffer = thread_data->td->obmc_buffer;
+      tf_alloc_and_reset_data(&thread_data->td->tf_data, cpi->tf_ctx.num_pels,
+                              is_highbitdepth);
+    }
+  }
+}
+
+// Deallocate thread specific data for temporal filter.
+static void tf_dealloc_thread_data(AV1_COMP *cpi, int num_workers,
+                                   int is_highbitdepth) {
+  MultiThreadInfo *mt_info = &cpi->mt_info;
+  for (int i = num_workers - 1; i >= 0; i--) {
+    EncWorkerData *thread_data = &mt_info->tile_thr_data[i];
+    ThreadData *td = thread_data->td;
+    if (td != &cpi->td) tf_dealloc_data(&td->tf_data, is_highbitdepth);
+  }
+}
+
+// Accumulate sse and sum after temporal filtering.
+static void tf_accumulate_frame_diff(AV1_COMP *cpi, int num_workers) {
+  FRAME_DIFF *total_diff = &cpi->td.tf_data.diff;
+  for (int i = num_workers - 1; i >= 0; i--) {
+    AVxWorker *const worker = &cpi->mt_info.workers[i];
+    EncWorkerData *const thread_data = (EncWorkerData *)worker->data1;
+    ThreadData *td = thread_data->td;
+    FRAME_DIFF *diff = &td->tf_data.diff;
+    if (td != &cpi->td) {
+      total_diff->sse += diff->sse;
+      total_diff->sum += diff->sum;
+    }
+  }
+}
+
+// Implements multi-threading for temporal filter.
+void av1_tf_do_filtering_mt(AV1_COMP *cpi) {
+  AV1_COMMON *cm = &cpi->common;
+  MultiThreadInfo *mt_info = &cpi->mt_info;
+  const int is_highbitdepth = cpi->tf_ctx.is_highbitdepth;
+
+  int num_workers = mt_info->num_workers;
+  if (mt_info->num_enc_workers == 0)
+    create_enc_workers(cpi, num_workers);
+  else
+    num_workers = AOMMIN(num_workers, mt_info->num_enc_workers);
+
+  prepare_tf_workers(cpi, tf_worker_hook, num_workers, is_highbitdepth);
+  launch_enc_workers(mt_info, num_workers);
+  sync_enc_workers(mt_info, cm, num_workers);
+  tf_accumulate_frame_diff(cpi, num_workers);
+  tf_dealloc_thread_data(cpi, num_workers, is_highbitdepth);
+}
+
 // Checks if a job is available in the current direction. If a job is available,
 // frame_idx will be populated and returns 1, else returns 0.
 static AOM_INLINE int get_next_gm_job(AV1_COMP *cpi, int *frame_idx,
diff --git a/av1/encoder/ethread.h b/av1/encoder/ethread.h
index 21bfc01..ab8e1bb 100644
--- a/av1/encoder/ethread.h
+++ b/av1/encoder/ethread.h
@@ -70,6 +70,10 @@
 
 #endif  // !CONFIG_REALTIME_ONLY
 
+void av1_tf_do_filtering_mt(AV1_COMP *cpi);
+
+void av1_tf_mt_dealloc(AV1TemporalFilterSync *tf_sync);
+
 int av1_compute_num_enc_workers(AV1_COMP *cpi, int max_workers);
 
 void av1_create_workers(AV1_COMP *cpi, int num_workers);
diff --git a/av1/encoder/temporal_filter.c b/av1/encoder/temporal_filter.c
index 737dac0..9b3fc40 100644
--- a/av1/encoder/temporal_filter.c
+++ b/av1/encoder/temporal_filter.c
@@ -22,6 +22,7 @@
 #include "av1/encoder/av1_quantize.h"
 #include "av1/encoder/encodeframe.h"
 #include "av1/encoder/encoder.h"
+#include "av1/encoder/ethread.h"
 #include "av1/encoder/extend.h"
 #include "av1/encoder/firstpass.h"
 #include "av1/encoder/mcomp.h"
@@ -751,18 +752,7 @@
   return q;
 }
 
-/*!\brief Does temporal filter for a given macroblock row.
-*
-* \ingroup src_frame_proc
-* \param[in]   cpi                   Top level encoder instance structure
-* \param[in]   td                    Pointer to thread data
-* \param[in]   mb_row                Macroblock row to be filtered
-filtering
-*
-* \return Nothing will be returned, but the contents of td->diff will be
-modified.
-*/
-static void tf_do_filtering_row(AV1_COMP *cpi, ThreadData *td, int mb_row) {
+void av1_tf_do_filtering_row(AV1_COMP *cpi, ThreadData *td, int mb_row) {
   TemporalFilterCtx *tf_ctx = &cpi->tf_ctx;
   YV12_BUFFER_CONFIG **frames = tf_ctx->frames;
   const int num_frames = tf_ctx->num_frames;
@@ -906,7 +896,7 @@
 
   // Perform temporal filtering for each row.
   for (int mb_row = 0; mb_row < tf_ctx->mb_rows; mb_row++)
-    tf_do_filtering_row(cpi, td, mb_row);
+    av1_tf_do_filtering_row(cpi, td, mb_row);
 
   tf_restore_state(mbd, input_mb_mode_info, input_buffer, num_planes);
 }
@@ -1176,6 +1166,7 @@
 
 int av1_temporal_filter(AV1_COMP *cpi, const int filter_frame_lookahead_idx,
                         int *show_existing_arf) {
+  MultiThreadInfo *const mt_info = &cpi->mt_info;
   // Basic informaton of the current frame.
   const GF_GROUP *const gf_group = &cpi->gf_group;
   const uint8_t group_idx = gf_group->index;
@@ -1219,7 +1210,10 @@
   tf_alloc_and_reset_data(tf_data, tf_ctx->num_pels, is_highbitdepth);
 
   // Perform temporal filtering process.
-  tf_do_filtering(cpi);
+  if (mt_info->num_workers > 1)
+    av1_tf_do_filtering_mt(cpi);
+  else
+    tf_do_filtering(cpi);
 
   // Deallocate temporal filter buffers.
   tf_dealloc_data(tf_data, is_highbitdepth);
diff --git a/av1/encoder/temporal_filter.h b/av1/encoder/temporal_filter.h
index 407efa6..461e445 100644
--- a/av1/encoder/temporal_filter.h
+++ b/av1/encoder/temporal_filter.h
@@ -17,6 +17,7 @@
 #endif
 /*!\cond */
 struct AV1_COMP;
+struct ThreadData;
 // TODO(any): These two variables are only used in avx2, sse2, sse4
 // implementations, where the block size is still hard coded. This should be
 // fixed to align with the c implementation.
@@ -142,6 +143,16 @@
   uint8_t *pred;
 } TemporalFilterData;
 
+// Data related to temporal filter multi-thread synchronization.
+typedef struct {
+#if CONFIG_MULTITHREAD
+  // Mutex lock used for dispatching jobs.
+  pthread_mutex_t *mutex_;
+#endif  // CONFIG_MULTITHREAD
+  // Next temporal filter block row to be filtered.
+  int next_tf_row;
+} AV1TemporalFilterSync;
+
 // Estimates noise level from a given frame using a single plane (Y, U, or V).
 // This is an adaptation of the mehtod in the following paper:
 // Shen-Chuan Tai, Shih-Ming Yang, "A fast method for image noise
@@ -162,6 +173,21 @@
 #define TF_QINDEX 128  // Q-index used in temporal filtering.
 
 /*!\endcond */
+
+/*!\brief Does temporal filter for a given macroblock row.
+*
+* \ingroup src_frame_proc
+* \param[in]   cpi                   Top level encoder instance structure
+* \param[in]   td                    Pointer to thread data
+* \param[in]   mb_row                Macroblock row to be filtered
+filtering
+*
+* \return Nothing will be returned, but the contents of td->diff will be
+modified.
+*/
+void av1_tf_do_filtering_row(struct AV1_COMP *cpi, struct ThreadData *td,
+                             int mb_row);
+
 /*!\brief Performs temporal filtering if needed on a source frame.
  * For example to create a filtered alternate reference frame (ARF)
  *