Revert "Revert "Propagate error from worker to main thread during lf MT""

This reverts commit 5f8db64abce68a3698fb732697ae50880bc9cac4.

Reason for revert: Re-submit with the fix for oss-fuzz:61802.

Reset 'worker->had_error' inside loop_filter_rows_mt().

Bug: oss-fuzz:61802
Bug: aomedia:3276
Change-Id: Ibbea95e4d8826ca9129c9768d0e09c6628480eb2
diff --git a/av1/common/av1_loopfilter.h b/av1/common/av1_loopfilter.h
index 78443c7..c9880cf 100644
--- a/av1/common/av1_loopfilter.h
+++ b/av1/common/av1_loopfilter.h
@@ -14,6 +14,8 @@
 
 #include "config/aom_config.h"
 
+#include "aom/internal/aom_codec_internal.h"
+
 #include "aom_ports/mem.h"
 #include "av1/common/blockd.h"
 #include "av1/common/seg_common.h"
@@ -87,6 +89,7 @@
 
   AV1_DEBLOCKING_PARAMETERS params_buf[MAX_MIB_SIZE];
   TX_SIZE tx_buf[MAX_MIB_SIZE];
+  struct aom_internal_error_info error_info;
 } LFWorkerData;
 /*!\endcond */
 
diff --git a/av1/common/thread_common.c b/av1/common/thread_common.c
index 06d906c..7a9a0e7 100644
--- a/av1/common/thread_common.c
+++ b/av1/common/thread_common.c
@@ -57,6 +57,7 @@
 void av1_loop_filter_alloc(AV1LfSync *lf_sync, AV1_COMMON *cm, int rows,
                            int width, int num_workers) {
   lf_sync->rows = rows;
+  lf_sync->lf_mt_exit = false;
 #if CONFIG_MULTITHREAD
   {
     int i, j;
@@ -252,8 +253,12 @@
     const YV12_BUFFER_CONFIG *const frame_buffer, AV1_COMMON *const cm,
     struct macroblockd_plane *planes, MACROBLOCKD *xd, int mi_row, int plane,
     int dir, int lpf_opt_level, AV1LfSync *const lf_sync,
+    struct aom_internal_error_info *error_info,
     AV1_DEBLOCKING_PARAMETERS *params_buf, TX_SIZE *tx_buf,
     int num_mis_in_lpf_unit_height_log2) {
+  // TODO(aomedia:3276): Pass error_info to the low-level functions as required
+  // in future to handle error propagation.
+  (void)error_info;
   const int sb_cols =
       CEIL_POWER_OF_TWO(cm->mi_params.mi_cols, MAX_MIB_SIZE_LOG2);
   const int r = mi_row >> num_mis_in_lpf_unit_height_log2;
@@ -300,6 +305,16 @@
         sync_read(lf_sync, r + 1, c, plane);
       }
 
+#if CONFIG_MULTITHREAD
+      if (lf_sync && lf_sync->num_workers > 1) {
+        pthread_mutex_lock(lf_sync->job_mutex);
+        const bool lf_mt_exit = lf_sync->lf_mt_exit;
+        pthread_mutex_unlock(lf_sync->job_mutex);
+        // Exit in case any worker has encountered an error.
+        if (lf_mt_exit) return;
+      }
+#endif
+
       av1_setup_dst_planes(planes, cm->seq_params->sb_size, frame_buffer,
                            mi_row, mi_col, plane, plane + num_planes);
       if (lpf_opt_level) {
@@ -320,27 +335,93 @@
   }
 }
 
+void av1_set_vert_loop_filter_done(AV1_COMMON *cm, AV1LfSync *lf_sync,
+                                   int num_mis_in_lpf_unit_height_log2) {
+  int plane, sb_row;
+  const int sb_cols =
+      CEIL_POWER_OF_TWO(cm->mi_params.mi_cols, num_mis_in_lpf_unit_height_log2);
+  const int sb_rows =
+      CEIL_POWER_OF_TWO(cm->mi_params.mi_rows, num_mis_in_lpf_unit_height_log2);
+
+  // In case of loopfilter row-multithreading, the worker on an SB row waits for
+  // the vertical edge filtering of the right and top-right SBs. Hence, in case
+  // a thread (main/worker) encounters an error, update that vertical
+  // loopfiltering of every SB row in the frame is complete in order to avoid
+  // dependent workers waiting indefinitely.
+  for (sb_row = 0; sb_row < sb_rows; ++sb_row)
+    for (plane = 0; plane < MAX_MB_PLANE; ++plane)
+      sync_write(lf_sync, sb_row, sb_cols - 1, sb_cols, plane);
+}
+
+static AOM_INLINE void sync_lf_workers(AVxWorker *const workers,
+                                       AV1_COMMON *const cm, int num_workers) {
+  const AVxWorkerInterface *const winterface = aom_get_worker_interface();
+  int had_error = workers[0].had_error;
+  struct aom_internal_error_info error_info;
+
+  // Read the error_info of main thread.
+  if (had_error) {
+    AVxWorker *const worker = &workers[0];
+    error_info = ((LFWorkerData *)worker->data2)->error_info;
+  }
+
+  // Wait till all rows are finished.
+  for (int i = num_workers - 1; i > 0; --i) {
+    AVxWorker *const worker = &workers[i];
+    if (!winterface->sync(worker)) {
+      had_error = 1;
+      error_info = ((LFWorkerData *)worker->data2)->error_info;
+    }
+  }
+  if (had_error)
+    aom_internal_error(cm->error, error_info.error_code, "%s",
+                       error_info.detail);
+}
+
 // Row-based multi-threaded loopfilter hook
 static int loop_filter_row_worker(void *arg1, void *arg2) {
   AV1LfSync *const lf_sync = (AV1LfSync *)arg1;
   LFWorkerData *const lf_data = (LFWorkerData *)arg2;
   AV1LfMTInfo *cur_job_info;
+
+#if CONFIG_MULTITHREAD
+  pthread_mutex_t *job_mutex_ = lf_sync->job_mutex;
+#endif
+
+  struct aom_internal_error_info *const error_info = &lf_data->error_info;
+
+  // The jmp_buf is valid only for the duration of the function that calls
+  // setjmp(). Therefore, this function must reset the 'setjmp' field to 0
+  // before it returns.
+  if (setjmp(error_info->jmp)) {
+    error_info->setjmp = 0;
+#if CONFIG_MULTITHREAD
+    pthread_mutex_lock(job_mutex_);
+    lf_sync->lf_mt_exit = true;
+    pthread_mutex_unlock(job_mutex_);
+#endif
+    av1_set_vert_loop_filter_done(lf_data->cm, lf_sync, MAX_MIB_SIZE_LOG2);
+    return 0;
+  }
+  error_info->setjmp = 1;
+
   while ((cur_job_info = get_lf_job_info(lf_sync)) != NULL) {
     const int lpf_opt_level = cur_job_info->lpf_opt_level;
     av1_thread_loop_filter_rows(
         lf_data->frame_buffer, lf_data->cm, lf_data->planes, lf_data->xd,
         cur_job_info->mi_row, cur_job_info->plane, cur_job_info->dir,
-        lpf_opt_level, lf_sync, lf_data->params_buf, lf_data->tx_buf,
-        MAX_MIB_SIZE_LOG2);
+        lpf_opt_level, lf_sync, error_info, lf_data->params_buf,
+        lf_data->tx_buf, MAX_MIB_SIZE_LOG2);
   }
+  error_info->setjmp = 0;
   return 1;
 }
 
 static void loop_filter_rows_mt(YV12_BUFFER_CONFIG *frame, AV1_COMMON *cm,
                                 MACROBLOCKD *xd, int start, int stop,
-                                const int planes_to_lf[3], AVxWorker *workers,
-                                int num_workers, AV1LfSync *lf_sync,
-                                int lpf_opt_level) {
+                                const int planes_to_lf[MAX_MB_PLANE],
+                                AVxWorker *workers, int num_workers,
+                                AV1LfSync *lf_sync, int lpf_opt_level) {
   const AVxWorkerInterface *const winterface = aom_get_worker_interface();
   int i;
   loop_filter_frame_mt_init(cm, start, stop, planes_to_lf, num_workers, lf_sync,
@@ -359,6 +440,7 @@
     loop_filter_data_reset(lf_data, frame, cm, xd);
 
     // Start loopfiltering
+    worker->had_error = 0;
     if (i == 0) {
       winterface->execute(worker);
     } else {
@@ -366,15 +448,13 @@
     }
   }
 
-  // Wait till all rows are finished
-  for (i = 1; i < num_workers; ++i) {
-    winterface->sync(&workers[i]);
-  }
+  sync_lf_workers(workers, cm, num_workers);
 }
 
 static void loop_filter_rows(YV12_BUFFER_CONFIG *frame, AV1_COMMON *cm,
                              MACROBLOCKD *xd, int start, int stop,
-                             const int planes_to_lf[3], int lpf_opt_level) {
+                             const int planes_to_lf[MAX_MB_PLANE],
+                             int lpf_opt_level) {
   // Filter top rows of all planes first, in case the output can be partially
   // reconstructed row by row.
   int mi_row, plane, dir;
@@ -382,7 +462,7 @@
   AV1_DEBLOCKING_PARAMETERS params_buf[MAX_MIB_SIZE];
   TX_SIZE tx_buf[MAX_MIB_SIZE];
   for (mi_row = start; mi_row < stop; mi_row += MAX_MIB_SIZE) {
-    for (plane = 0; plane < 3; ++plane) {
+    for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
       if (skip_loop_filter_plane(planes_to_lf, plane, lpf_opt_level)) {
         continue;
       }
@@ -390,7 +470,8 @@
       for (dir = 0; dir < 2; ++dir) {
         av1_thread_loop_filter_rows(frame, cm, xd->plane, xd, mi_row, plane,
                                     dir, lpf_opt_level, /*lf_sync=*/NULL,
-                                    params_buf, tx_buf, MAX_MIB_SIZE_LOG2);
+                                    xd->error_info, params_buf, tx_buf,
+                                    MAX_MIB_SIZE_LOG2);
       }
     }
   }
@@ -402,7 +483,7 @@
                               int num_workers, AV1LfSync *lf_sync,
                               int lpf_opt_level) {
   int start_mi_row, end_mi_row, mi_rows_to_filter;
-  int planes_to_lf[3];
+  int planes_to_lf[MAX_MB_PLANE];
 
   if (!check_planes_to_loop_filter(&cm->lf, planes_to_lf, plane_start,
                                    plane_end))
@@ -703,9 +784,10 @@
   typedef void (*copy_fun)(const YV12_BUFFER_CONFIG *src_ybc,
                            YV12_BUFFER_CONFIG *dst_ybc, int hstart, int hend,
                            int vstart, int vend);
-  static const copy_fun copy_funs[3] = { aom_yv12_partial_coloc_copy_y,
-                                         aom_yv12_partial_coloc_copy_u,
-                                         aom_yv12_partial_coloc_copy_v };
+  static const copy_fun copy_funs[MAX_MB_PLANE] = {
+    aom_yv12_partial_coloc_copy_y, aom_yv12_partial_coloc_copy_u,
+    aom_yv12_partial_coloc_copy_v
+  };
 
   while (1) {
     AV1LrMTInfo *cur_job_info = get_lr_job_info(lr_sync);
diff --git a/av1/common/thread_common.h b/av1/common/thread_common.h
index 4cf23f2..c37f484 100644
--- a/av1/common/thread_common.h
+++ b/av1/common/thread_common.h
@@ -54,6 +54,10 @@
   AV1LfMTInfo *job_queue;
   int jobs_enqueued;
   int jobs_dequeued;
+
+  // Initialized to false, set to true by the worker thread that encounters an
+  // error in order to abort the processing of other worker threads.
+  bool lf_mt_exit;
 } AV1LfSync;
 
 typedef struct AV1LrMTInfo {
@@ -164,6 +168,9 @@
 void av1_loop_filter_alloc(AV1LfSync *lf_sync, AV1_COMMON *cm, int rows,
                            int width, int num_workers);
 
+void av1_set_vert_loop_filter_done(AV1_COMMON *cm, AV1LfSync *lf_sync,
+                                   int num_mis_in_lpf_unit_height_log2);
+
 void av1_loop_filter_frame_mt(YV12_BUFFER_CONFIG *frame, struct AV1Common *cm,
                               struct macroblockd *xd, int plane_start,
                               int plane_end, int partial_frame,
@@ -185,11 +192,11 @@
     const YV12_BUFFER_CONFIG *const frame_buffer, AV1_COMMON *const cm,
     struct macroblockd_plane *planes, MACROBLOCKD *xd, int mi_row, int plane,
     int dir, int lpf_opt_level, AV1LfSync *const lf_sync,
+    struct aom_internal_error_info *error_info,
     AV1_DEBLOCKING_PARAMETERS *params_buf, TX_SIZE *tx_buf, int mib_size_log2);
 
-static AOM_FORCE_INLINE bool skip_loop_filter_plane(const int planes_to_lf[3],
-                                                    int plane,
-                                                    int lpf_opt_level) {
+static AOM_FORCE_INLINE bool skip_loop_filter_plane(
+    const int planes_to_lf[MAX_MB_PLANE], int plane, int lpf_opt_level) {
   // If LPF_PICK_METHOD is LPF_PICK_FROM_Q, we have the option to filter both
   // chroma planes together
   if (lpf_opt_level == 2) {
@@ -212,7 +219,7 @@
 }
 
 static AOM_INLINE void enqueue_lf_jobs(AV1LfSync *lf_sync, int start, int stop,
-                                       const int planes_to_lf[3],
+                                       const int planes_to_lf[MAX_MB_PLANE],
                                        int lpf_opt_level,
                                        int num_mis_in_lpf_unit_height) {
   int mi_row, plane, dir;
@@ -225,7 +232,7 @@
   // partially reconstructed row by row.
   for (dir = 0; dir < 2; ++dir) {
     for (mi_row = start; mi_row < stop; mi_row += num_mis_in_lpf_unit_height) {
-      for (plane = 0; plane < 3; ++plane) {
+      for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
         if (skip_loop_filter_plane(planes_to_lf, plane, lpf_opt_level)) {
           continue;
         }
@@ -242,9 +249,9 @@
 }
 
 static AOM_INLINE void loop_filter_frame_mt_init(
-    AV1_COMMON *cm, int start_mi_row, int end_mi_row, const int planes_to_lf[3],
-    int num_workers, AV1LfSync *lf_sync, int lpf_opt_level,
-    int num_mis_in_lpf_unit_height_log2) {
+    AV1_COMMON *cm, int start_mi_row, int end_mi_row,
+    const int planes_to_lf[MAX_MB_PLANE], int num_workers, AV1LfSync *lf_sync,
+    int lpf_opt_level, int num_mis_in_lpf_unit_height_log2) {
   // Number of superblock rows
   const int sb_rows =
       CEIL_POWER_OF_TWO(cm->mi_params.mi_rows, num_mis_in_lpf_unit_height_log2);
@@ -271,7 +278,7 @@
 #if CONFIG_MULTITHREAD
   pthread_mutex_lock(lf_sync->job_mutex);
 
-  if (lf_sync->jobs_dequeued < lf_sync->jobs_enqueued) {
+  if (!lf_sync->lf_mt_exit && lf_sync->jobs_dequeued < lf_sync->jobs_enqueued) {
     cur_job_info = lf_sync->job_queue + lf_sync->jobs_dequeued;
     lf_sync->jobs_dequeued++;
   }
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c
index 3b601d0..045a1a0 100644
--- a/av1/encoder/ethread.c
+++ b/av1/encoder/ethread.c
@@ -538,8 +538,8 @@
     av1_thread_loop_filter_rows(
         lf_data->frame_buffer, lf_data->cm, lf_data->planes, lf_data->xd,
         cur_job_info->mi_row, cur_job_info->plane, cur_job_info->dir,
-        lpf_opt_level, lf_sync, lf_data->params_buf, lf_data->tx_buf,
-        mib_size_log2);
+        lpf_opt_level, lf_sync, &thread_data->error_info, lf_data->params_buf,
+        lf_data->tx_buf, mib_size_log2);
   }
 }
 
@@ -583,6 +583,7 @@
   (void)unused;
 
   struct aom_internal_error_info *const error_info = &thread_data->error_info;
+  AV1LfSync *const lf_sync = thread_data->lf_sync;
   MACROBLOCKD *const xd = &thread_data->td->mb.e_mbd;
   xd->error_info = error_info;
 
@@ -600,6 +601,16 @@
     pthread_mutex_unlock(enc_row_mt_mutex_);
 #endif
     set_encoding_done(cpi);
+
+    if (cpi->mt_info.pipeline_lpf_mt_with_enc) {
+#if CONFIG_MULTITHREAD
+      pthread_mutex_lock(lf_sync->job_mutex);
+      lf_sync->lf_mt_exit = true;
+      pthread_mutex_unlock(lf_sync->job_mutex);
+#endif
+      av1_set_vert_loop_filter_done(&cpi->common, lf_sync,
+                                    cpi->common.seq_params->mib_size_log2);
+    }
     return 0;
   }
   error_info->setjmp = 1;