Revert "Revert "Propagate error from worker to main thread during lf MT""
This reverts commit 5f8db64abce68a3698fb732697ae50880bc9cac4.
Reason for revert: Re-submit with the fix for oss-fuzz:61802.
Reset 'worker->had_error' inside loop_filter_rows_mt().
Bug: oss-fuzz:61802
Bug: aomedia:3276
Change-Id: Ibbea95e4d8826ca9129c9768d0e09c6628480eb2
diff --git a/av1/common/av1_loopfilter.h b/av1/common/av1_loopfilter.h
index 78443c7..c9880cf 100644
--- a/av1/common/av1_loopfilter.h
+++ b/av1/common/av1_loopfilter.h
@@ -14,6 +14,8 @@
#include "config/aom_config.h"
+#include "aom/internal/aom_codec_internal.h"
+
#include "aom_ports/mem.h"
#include "av1/common/blockd.h"
#include "av1/common/seg_common.h"
@@ -87,6 +89,7 @@
AV1_DEBLOCKING_PARAMETERS params_buf[MAX_MIB_SIZE];
TX_SIZE tx_buf[MAX_MIB_SIZE];
+ struct aom_internal_error_info error_info;
} LFWorkerData;
/*!\endcond */
diff --git a/av1/common/thread_common.c b/av1/common/thread_common.c
index 06d906c..7a9a0e7 100644
--- a/av1/common/thread_common.c
+++ b/av1/common/thread_common.c
@@ -57,6 +57,7 @@
void av1_loop_filter_alloc(AV1LfSync *lf_sync, AV1_COMMON *cm, int rows,
int width, int num_workers) {
lf_sync->rows = rows;
+ lf_sync->lf_mt_exit = false;
#if CONFIG_MULTITHREAD
{
int i, j;
@@ -252,8 +253,12 @@
const YV12_BUFFER_CONFIG *const frame_buffer, AV1_COMMON *const cm,
struct macroblockd_plane *planes, MACROBLOCKD *xd, int mi_row, int plane,
int dir, int lpf_opt_level, AV1LfSync *const lf_sync,
+ struct aom_internal_error_info *error_info,
AV1_DEBLOCKING_PARAMETERS *params_buf, TX_SIZE *tx_buf,
int num_mis_in_lpf_unit_height_log2) {
+ // TODO(aomedia:3276): Pass error_info to the low-level functions as required
+ // in future to handle error propagation.
+ (void)error_info;
const int sb_cols =
CEIL_POWER_OF_TWO(cm->mi_params.mi_cols, MAX_MIB_SIZE_LOG2);
const int r = mi_row >> num_mis_in_lpf_unit_height_log2;
@@ -300,6 +305,16 @@
sync_read(lf_sync, r + 1, c, plane);
}
+#if CONFIG_MULTITHREAD
+ if (lf_sync && lf_sync->num_workers > 1) {
+ pthread_mutex_lock(lf_sync->job_mutex);
+ const bool lf_mt_exit = lf_sync->lf_mt_exit;
+ pthread_mutex_unlock(lf_sync->job_mutex);
+ // Exit in case any worker has encountered an error.
+ if (lf_mt_exit) return;
+ }
+#endif
+
av1_setup_dst_planes(planes, cm->seq_params->sb_size, frame_buffer,
mi_row, mi_col, plane, plane + num_planes);
if (lpf_opt_level) {
@@ -320,27 +335,93 @@
}
}
+void av1_set_vert_loop_filter_done(AV1_COMMON *cm, AV1LfSync *lf_sync,
+ int num_mis_in_lpf_unit_height_log2) {
+ int plane, sb_row;
+ const int sb_cols =
+ CEIL_POWER_OF_TWO(cm->mi_params.mi_cols, num_mis_in_lpf_unit_height_log2);
+ const int sb_rows =
+ CEIL_POWER_OF_TWO(cm->mi_params.mi_rows, num_mis_in_lpf_unit_height_log2);
+
+ // In case of loopfilter row-multithreading, the worker on an SB row waits for
+ // the vertical edge filtering of the right and top-right SBs. Hence, in case
+ // a thread (main/worker) encounters an error, update that vertical
+ // loopfiltering of every SB row in the frame is complete in order to avoid
+ // dependent workers waiting indefinitely.
+ for (sb_row = 0; sb_row < sb_rows; ++sb_row)
+ for (plane = 0; plane < MAX_MB_PLANE; ++plane)
+ sync_write(lf_sync, sb_row, sb_cols - 1, sb_cols, plane);
+}
+
+static AOM_INLINE void sync_lf_workers(AVxWorker *const workers,
+ AV1_COMMON *const cm, int num_workers) {
+ const AVxWorkerInterface *const winterface = aom_get_worker_interface();
+ int had_error = workers[0].had_error;
+ struct aom_internal_error_info error_info;
+
+ // Read the error_info of main thread.
+ if (had_error) {
+ AVxWorker *const worker = &workers[0];
+ error_info = ((LFWorkerData *)worker->data2)->error_info;
+ }
+
+ // Wait till all rows are finished.
+ for (int i = num_workers - 1; i > 0; --i) {
+ AVxWorker *const worker = &workers[i];
+ if (!winterface->sync(worker)) {
+ had_error = 1;
+ error_info = ((LFWorkerData *)worker->data2)->error_info;
+ }
+ }
+ if (had_error)
+ aom_internal_error(cm->error, error_info.error_code, "%s",
+ error_info.detail);
+}
+
// Row-based multi-threaded loopfilter hook
static int loop_filter_row_worker(void *arg1, void *arg2) {
AV1LfSync *const lf_sync = (AV1LfSync *)arg1;
LFWorkerData *const lf_data = (LFWorkerData *)arg2;
AV1LfMTInfo *cur_job_info;
+
+#if CONFIG_MULTITHREAD
+ pthread_mutex_t *job_mutex_ = lf_sync->job_mutex;
+#endif
+
+ struct aom_internal_error_info *const error_info = &lf_data->error_info;
+
+ // The jmp_buf is valid only for the duration of the function that calls
+ // setjmp(). Therefore, this function must reset the 'setjmp' field to 0
+ // before it returns.
+ if (setjmp(error_info->jmp)) {
+ error_info->setjmp = 0;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(job_mutex_);
+ lf_sync->lf_mt_exit = true;
+ pthread_mutex_unlock(job_mutex_);
+#endif
+ av1_set_vert_loop_filter_done(lf_data->cm, lf_sync, MAX_MIB_SIZE_LOG2);
+ return 0;
+ }
+ error_info->setjmp = 1;
+
while ((cur_job_info = get_lf_job_info(lf_sync)) != NULL) {
const int lpf_opt_level = cur_job_info->lpf_opt_level;
av1_thread_loop_filter_rows(
lf_data->frame_buffer, lf_data->cm, lf_data->planes, lf_data->xd,
cur_job_info->mi_row, cur_job_info->plane, cur_job_info->dir,
- lpf_opt_level, lf_sync, lf_data->params_buf, lf_data->tx_buf,
- MAX_MIB_SIZE_LOG2);
+ lpf_opt_level, lf_sync, error_info, lf_data->params_buf,
+ lf_data->tx_buf, MAX_MIB_SIZE_LOG2);
}
+ error_info->setjmp = 0;
return 1;
}
static void loop_filter_rows_mt(YV12_BUFFER_CONFIG *frame, AV1_COMMON *cm,
MACROBLOCKD *xd, int start, int stop,
- const int planes_to_lf[3], AVxWorker *workers,
- int num_workers, AV1LfSync *lf_sync,
- int lpf_opt_level) {
+ const int planes_to_lf[MAX_MB_PLANE],
+ AVxWorker *workers, int num_workers,
+ AV1LfSync *lf_sync, int lpf_opt_level) {
const AVxWorkerInterface *const winterface = aom_get_worker_interface();
int i;
loop_filter_frame_mt_init(cm, start, stop, planes_to_lf, num_workers, lf_sync,
@@ -359,6 +440,7 @@
loop_filter_data_reset(lf_data, frame, cm, xd);
// Start loopfiltering
+ worker->had_error = 0;
if (i == 0) {
winterface->execute(worker);
} else {
@@ -366,15 +448,13 @@
}
}
- // Wait till all rows are finished
- for (i = 1; i < num_workers; ++i) {
- winterface->sync(&workers[i]);
- }
+ sync_lf_workers(workers, cm, num_workers);
}
static void loop_filter_rows(YV12_BUFFER_CONFIG *frame, AV1_COMMON *cm,
MACROBLOCKD *xd, int start, int stop,
- const int planes_to_lf[3], int lpf_opt_level) {
+ const int planes_to_lf[MAX_MB_PLANE],
+ int lpf_opt_level) {
// Filter top rows of all planes first, in case the output can be partially
// reconstructed row by row.
int mi_row, plane, dir;
@@ -382,7 +462,7 @@
AV1_DEBLOCKING_PARAMETERS params_buf[MAX_MIB_SIZE];
TX_SIZE tx_buf[MAX_MIB_SIZE];
for (mi_row = start; mi_row < stop; mi_row += MAX_MIB_SIZE) {
- for (plane = 0; plane < 3; ++plane) {
+ for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
if (skip_loop_filter_plane(planes_to_lf, plane, lpf_opt_level)) {
continue;
}
@@ -390,7 +470,8 @@
for (dir = 0; dir < 2; ++dir) {
av1_thread_loop_filter_rows(frame, cm, xd->plane, xd, mi_row, plane,
dir, lpf_opt_level, /*lf_sync=*/NULL,
- params_buf, tx_buf, MAX_MIB_SIZE_LOG2);
+ xd->error_info, params_buf, tx_buf,
+ MAX_MIB_SIZE_LOG2);
}
}
}
@@ -402,7 +483,7 @@
int num_workers, AV1LfSync *lf_sync,
int lpf_opt_level) {
int start_mi_row, end_mi_row, mi_rows_to_filter;
- int planes_to_lf[3];
+ int planes_to_lf[MAX_MB_PLANE];
if (!check_planes_to_loop_filter(&cm->lf, planes_to_lf, plane_start,
plane_end))
@@ -703,9 +784,10 @@
typedef void (*copy_fun)(const YV12_BUFFER_CONFIG *src_ybc,
YV12_BUFFER_CONFIG *dst_ybc, int hstart, int hend,
int vstart, int vend);
- static const copy_fun copy_funs[3] = { aom_yv12_partial_coloc_copy_y,
- aom_yv12_partial_coloc_copy_u,
- aom_yv12_partial_coloc_copy_v };
+ static const copy_fun copy_funs[MAX_MB_PLANE] = {
+ aom_yv12_partial_coloc_copy_y, aom_yv12_partial_coloc_copy_u,
+ aom_yv12_partial_coloc_copy_v
+ };
while (1) {
AV1LrMTInfo *cur_job_info = get_lr_job_info(lr_sync);
diff --git a/av1/common/thread_common.h b/av1/common/thread_common.h
index 4cf23f2..c37f484 100644
--- a/av1/common/thread_common.h
+++ b/av1/common/thread_common.h
@@ -54,6 +54,10 @@
AV1LfMTInfo *job_queue;
int jobs_enqueued;
int jobs_dequeued;
+
+ // Initialized to false, set to true by the worker thread that encounters an
+ // error in order to abort the processing of other worker threads.
+ bool lf_mt_exit;
} AV1LfSync;
typedef struct AV1LrMTInfo {
@@ -164,6 +168,9 @@
void av1_loop_filter_alloc(AV1LfSync *lf_sync, AV1_COMMON *cm, int rows,
int width, int num_workers);
+void av1_set_vert_loop_filter_done(AV1_COMMON *cm, AV1LfSync *lf_sync,
+ int num_mis_in_lpf_unit_height_log2);
+
void av1_loop_filter_frame_mt(YV12_BUFFER_CONFIG *frame, struct AV1Common *cm,
struct macroblockd *xd, int plane_start,
int plane_end, int partial_frame,
@@ -185,11 +192,11 @@
const YV12_BUFFER_CONFIG *const frame_buffer, AV1_COMMON *const cm,
struct macroblockd_plane *planes, MACROBLOCKD *xd, int mi_row, int plane,
int dir, int lpf_opt_level, AV1LfSync *const lf_sync,
+ struct aom_internal_error_info *error_info,
AV1_DEBLOCKING_PARAMETERS *params_buf, TX_SIZE *tx_buf, int mib_size_log2);
-static AOM_FORCE_INLINE bool skip_loop_filter_plane(const int planes_to_lf[3],
- int plane,
- int lpf_opt_level) {
+static AOM_FORCE_INLINE bool skip_loop_filter_plane(
+ const int planes_to_lf[MAX_MB_PLANE], int plane, int lpf_opt_level) {
// If LPF_PICK_METHOD is LPF_PICK_FROM_Q, we have the option to filter both
// chroma planes together
if (lpf_opt_level == 2) {
@@ -212,7 +219,7 @@
}
static AOM_INLINE void enqueue_lf_jobs(AV1LfSync *lf_sync, int start, int stop,
- const int planes_to_lf[3],
+ const int planes_to_lf[MAX_MB_PLANE],
int lpf_opt_level,
int num_mis_in_lpf_unit_height) {
int mi_row, plane, dir;
@@ -225,7 +232,7 @@
// partially reconstructed row by row.
for (dir = 0; dir < 2; ++dir) {
for (mi_row = start; mi_row < stop; mi_row += num_mis_in_lpf_unit_height) {
- for (plane = 0; plane < 3; ++plane) {
+ for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
if (skip_loop_filter_plane(planes_to_lf, plane, lpf_opt_level)) {
continue;
}
@@ -242,9 +249,9 @@
}
static AOM_INLINE void loop_filter_frame_mt_init(
- AV1_COMMON *cm, int start_mi_row, int end_mi_row, const int planes_to_lf[3],
- int num_workers, AV1LfSync *lf_sync, int lpf_opt_level,
- int num_mis_in_lpf_unit_height_log2) {
+ AV1_COMMON *cm, int start_mi_row, int end_mi_row,
+ const int planes_to_lf[MAX_MB_PLANE], int num_workers, AV1LfSync *lf_sync,
+ int lpf_opt_level, int num_mis_in_lpf_unit_height_log2) {
// Number of superblock rows
const int sb_rows =
CEIL_POWER_OF_TWO(cm->mi_params.mi_rows, num_mis_in_lpf_unit_height_log2);
@@ -271,7 +278,7 @@
#if CONFIG_MULTITHREAD
pthread_mutex_lock(lf_sync->job_mutex);
- if (lf_sync->jobs_dequeued < lf_sync->jobs_enqueued) {
+ if (!lf_sync->lf_mt_exit && lf_sync->jobs_dequeued < lf_sync->jobs_enqueued) {
cur_job_info = lf_sync->job_queue + lf_sync->jobs_dequeued;
lf_sync->jobs_dequeued++;
}
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c
index 3b601d0..045a1a0 100644
--- a/av1/encoder/ethread.c
+++ b/av1/encoder/ethread.c
@@ -538,8 +538,8 @@
av1_thread_loop_filter_rows(
lf_data->frame_buffer, lf_data->cm, lf_data->planes, lf_data->xd,
cur_job_info->mi_row, cur_job_info->plane, cur_job_info->dir,
- lpf_opt_level, lf_sync, lf_data->params_buf, lf_data->tx_buf,
- mib_size_log2);
+ lpf_opt_level, lf_sync, &thread_data->error_info, lf_data->params_buf,
+ lf_data->tx_buf, mib_size_log2);
}
}
@@ -583,6 +583,7 @@
(void)unused;
struct aom_internal_error_info *const error_info = &thread_data->error_info;
+ AV1LfSync *const lf_sync = thread_data->lf_sync;
MACROBLOCKD *const xd = &thread_data->td->mb.e_mbd;
xd->error_info = error_info;
@@ -600,6 +601,16 @@
pthread_mutex_unlock(enc_row_mt_mutex_);
#endif
set_encoding_done(cpi);
+
+ if (cpi->mt_info.pipeline_lpf_mt_with_enc) {
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(lf_sync->job_mutex);
+ lf_sync->lf_mt_exit = true;
+ pthread_mutex_unlock(lf_sync->job_mutex);
+#endif
+ av1_set_vert_loop_filter_done(&cpi->common, lf_sync,
+ cpi->common.seq_params->mib_size_log2);
+ }
return 0;
}
error_info->setjmp = 1;