Add error propagation framework for av1_calc_mb_wiener_var_mt
This change sets a longjmp target in cal_mb_wiener_var_hook()
to propagate error from worker threads to the main thread
during multithreading of wiener variance computation. It also
ensures that other threads stop processing further if any of
the threads encounter an error.
Bug: aomedia:3276
Change-Id: I2590bdb690fae8d950ac52f5bc0f7c241d166cc6
diff --git a/av1/encoder/allintra_vis.c b/av1/encoder/allintra_vis.c
index a59d0d7..8dcef5f 100644
--- a/av1/encoder/allintra_vis.c
+++ b/av1/encoder/allintra_vis.c
@@ -270,13 +270,14 @@
const int coeff_count = block_size * block_size;
const int mb_step = mi_size_wide[bsize];
const BitDepthInfo bd_info = get_bit_depth_info(xd);
- const AV1EncAllIntraMultiThreadInfo *const intra_mt = &cpi->mt_info.intra_mt;
+ const MultiThreadInfo *const mt_info = &cpi->mt_info;
+ const AV1EncAllIntraMultiThreadInfo *const intra_mt = &mt_info->intra_mt;
AV1EncRowMultiThreadSync *const intra_row_mt_sync =
&cpi->ppi->intra_row_mt_sync;
const int mi_cols = cm->mi_params.mi_cols;
const int mt_thread_id = mi_row / mb_step;
// TODO(chengchen): test different unit step size
- const int mt_unit_step = mi_size_wide[BLOCK_64X64];
+ const int mt_unit_step = mi_size_wide[MB_WIENER_MT_UNIT_SIZE];
const int mt_unit_cols = (mi_cols + (mt_unit_step >> 1)) / mt_unit_step;
int mt_unit_col = 0;
const int is_high_bitdepth = is_cur_buf_hbd(xd);
@@ -293,6 +294,18 @@
if (mi_col % mt_unit_step == 0) {
intra_mt->intra_sync_read_ptr(intra_row_mt_sync, mt_thread_id,
mt_unit_col);
+#if CONFIG_MULTITHREAD
+ const int num_workers =
+ AOMMIN(mt_info->num_mod_workers[MOD_AI], mt_info->num_workers);
+ if (num_workers > 1) {
+ const AV1EncRowMultiThreadInfo *const enc_row_mt = &mt_info->enc_row_mt;
+ pthread_mutex_lock(enc_row_mt->mutex_);
+ const bool exit = enc_row_mt->mb_wiener_mt_exit;
+ pthread_mutex_unlock(enc_row_mt->mutex_);
+ // Stop further processing in case any worker has encountered an error.
+ if (exit) break;
+ }
+#endif
}
PREDICTION_MODE best_mode = DC_PRED;
diff --git a/av1/encoder/allintra_vis.h b/av1/encoder/allintra_vis.h
index ab39968..0d34ce0 100644
--- a/av1/encoder/allintra_vis.h
+++ b/av1/encoder/allintra_vis.h
@@ -20,6 +20,8 @@
#include "av1/encoder/block.h"
#include "av1/encoder/encoder.h"
+#define MB_WIENER_MT_UNIT_SIZE BLOCK_64X64
+
void av1_init_mb_wiener_var_buffer(AV1_COMP *cpi);
void av1_calc_mb_wiener_var_row(AV1_COMP *const cpi, MACROBLOCK *x,
diff --git a/av1/encoder/encoder.h b/av1/encoder/encoder.h
index 9930d8f..9d02993 100644
--- a/av1/encoder/encoder.h
+++ b/av1/encoder/encoder.h
@@ -1544,6 +1544,13 @@
*/
bool firstpass_mt_exit;
+ /*!
+ * Initialized to false, set to true in cal_mb_wiener_var_hook() by the worker
+ * thread that encounters an error in order to abort the processing of other
+ * worker threads.
+ */
+ bool mb_wiener_mt_exit;
+
#if CONFIG_MULTITHREAD
/*!
* Mutex lock used while dispatching jobs.
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c
index b878797..5493fc4 100644
--- a/av1/encoder/ethread.c
+++ b/av1/encoder/ethread.c
@@ -151,7 +151,13 @@
if (sig) {
pthread_mutex_lock(&row_mt_sync->mutex_[r]);
- row_mt_sync->num_finished_cols[r] = cur;
+ // When a thread encounters an error, num_finished_cols[r] is set to maximum
+ // column number. In this case, the AOMMAX operation here ensures that
+ // num_finished_cols[r] is not overwritten with a smaller value thus
+ // preventing the infinite waiting of threads in the relevant sync_read()
+ // function.
+ row_mt_sync->num_finished_cols[r] =
+ AOMMAX(row_mt_sync->num_finished_cols[r], cur);
pthread_cond_signal(&row_mt_sync->cond_[r]);
pthread_mutex_unlock(&row_mt_sync->mutex_[r]);
@@ -268,6 +274,7 @@
enc_row_mt->allocated_sb_rows = sb_rows;
enc_row_mt->row_mt_exit = false;
enc_row_mt->firstpass_mt_exit = false;
+ enc_row_mt->mb_wiener_mt_exit = false;
}
void av1_row_mt_mem_dealloc(AV1_COMP *cpi) {
@@ -2701,6 +2708,28 @@
}
}
+static void set_mb_wiener_var_calc_done(AV1_COMP *const cpi) {
+ const CommonModeInfoParams *const mi_params = &cpi->common.mi_params;
+ const BLOCK_SIZE bsize = cpi->weber_bsize;
+ const int mb_step = mi_size_wide[bsize];
+ assert(MB_WIENER_MT_UNIT_SIZE < BLOCK_SIZES_ALL);
+ const int mt_unit_step = mi_size_wide[MB_WIENER_MT_UNIT_SIZE];
+ const int mt_unit_cols =
+ (mi_params->mi_cols + (mt_unit_step >> 1)) / mt_unit_step;
+ const AV1EncAllIntraMultiThreadInfo *const intra_mt = &cpi->mt_info.intra_mt;
+ AV1EncRowMultiThreadSync *const intra_row_mt_sync =
+ &cpi->ppi->intra_row_mt_sync;
+
+ // Update the wiener variance computation of every row in the frame to
+ // indicate that it is complete in order to avoid dependent workers waiting
+ // indefinitely.
+ for (int mi_row = 0, mt_thread_id = 0; mi_row < mi_params->mi_rows;
+ mi_row += mb_step, ++mt_thread_id) {
+ intra_mt->intra_sync_write_ptr(intra_row_mt_sync, mt_thread_id,
+ mt_unit_cols - 1, mt_unit_cols);
+ }
+}
+
static int cal_mb_wiener_var_hook(void *arg1, void *unused) {
(void)unused;
EncWorkerData *const thread_data = (EncWorkerData *)arg1;
@@ -2714,25 +2743,44 @@
AV1EncRowMultiThreadInfo *const enc_row_mt = &cpi->mt_info.enc_row_mt;
(void)enc_row_mt;
#if CONFIG_MULTITHREAD
- pthread_mutex_t *enc_row_mt_mutex_ = enc_row_mt->mutex_;
+ pthread_mutex_t *enc_row_mt_mutex = enc_row_mt->mutex_;
#endif
+
+ struct aom_internal_error_info *const error_info = &thread_data->error_info;
+ xd->error_info = error_info;
+
+ // The jmp_buf is valid only for the duration of the function that calls
+ // setjmp(). Therefore, this function must reset the 'setjmp' field to 0
+ // before it returns.
+ if (setjmp(error_info->jmp)) {
+ error_info->setjmp = 0;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(enc_row_mt_mutex);
+ enc_row_mt->mb_wiener_mt_exit = true;
+ pthread_mutex_unlock(enc_row_mt_mutex);
+#endif
+ set_mb_wiener_var_calc_done(cpi);
+ return 0;
+ }
+ error_info->setjmp = 1;
DECLARE_ALIGNED(32, int16_t, src_diff[32 * 32]);
DECLARE_ALIGNED(32, tran_low_t, coeff[32 * 32]);
DECLARE_ALIGNED(32, tran_low_t, qcoeff[32 * 32]);
DECLARE_ALIGNED(32, tran_low_t, dqcoeff[32 * 32]);
double sum_rec_distortion = 0;
double sum_est_rate = 0;
- int has_jobs = 1;
- while (has_jobs) {
+ while (1) {
int current_mi_row = -1;
#if CONFIG_MULTITHREAD
- pthread_mutex_lock(enc_row_mt_mutex_);
+ pthread_mutex_lock(enc_row_mt_mutex);
#endif
- has_jobs =
- get_next_job_allintra(intra_row_mt_sync, cpi->common.mi_params.mi_rows,
- ¤t_mi_row, mb_step);
+ int has_jobs = enc_row_mt->mb_wiener_mt_exit
+ ? 0
+ : get_next_job_allintra(intra_row_mt_sync,
+ cpi->common.mi_params.mi_rows,
+ ¤t_mi_row, mb_step);
#if CONFIG_MULTITHREAD
- pthread_mutex_unlock(enc_row_mt_mutex_);
+ pthread_mutex_unlock(enc_row_mt_mutex);
#endif
if (!has_jobs) break;
// TODO(chengchen): properly accumulate the distortion and rate.
@@ -2741,13 +2789,14 @@
&sum_est_rate,
thread_data->td->wiener_tmp_pred_buf);
#if CONFIG_MULTITHREAD
- pthread_mutex_lock(enc_row_mt_mutex_);
+ pthread_mutex_lock(enc_row_mt_mutex);
#endif
intra_row_mt_sync->num_threads_working--;
#if CONFIG_MULTITHREAD
- pthread_mutex_unlock(enc_row_mt_mutex_);
+ pthread_mutex_unlock(enc_row_mt_mutex);
#endif
}
+ error_info->setjmp = 0;
return 1;
}