Implement row based multi-threading of encoder
Row based multi-threading of encoder has been implemented.
When tested for 20 frames of BasketballDrive_1920x1080_50 content,
with --cpu-used=1 --tile-columns=2 --tile-rows=2, the following
scaling was acheived for 4 threads:
Tile-based: Row based:
2.566 3.331
~23% encode time reduction was seen for row based multi-threading
of encoder when compared with tile-based multi-threading for the
above configuration.
Change-Id: I1e3d431ffcd3d0d398fe6c3aa34755e4d8c5332d
diff --git a/av1/encoder/encoder.c b/av1/encoder/encoder.c
index e94b832..53c60a7 100644
--- a/av1/encoder/encoder.c
+++ b/av1/encoder/encoder.c
@@ -3058,6 +3058,14 @@
aom_free(thread_data->td);
}
}
+#if CONFIG_MULTITHREAD
+ if (cpi->row_mt == 1) {
+ if (cpi->row_mt_mutex_ != NULL) {
+ pthread_mutex_destroy(cpi->row_mt_mutex_);
+ aom_free(cpi->row_mt_mutex_);
+ }
+ }
+#endif
av1_row_mt_mem_dealloc(cpi);
aom_free(cpi->tile_thr_data);
aom_free(cpi->workers);
diff --git a/av1/encoder/encoder.h b/av1/encoder/encoder.h
index f58a4a1..682b424 100644
--- a/av1/encoder/encoder.h
+++ b/av1/encoder/encoder.h
@@ -482,6 +482,11 @@
int rows;
} AV1RowMTSync;
+typedef struct AV1RowMTInfo {
+ int current_mi_row;
+ int num_threads_working;
+} AV1RowMTInfo;
+
// TODO(jingning) All spatially adaptive variables should go to TileDataEnc.
typedef struct TileDataEnc {
TileInfo tile_info;
@@ -497,6 +502,7 @@
InterModeRdModel inter_mode_rd_models[BLOCK_SIZES_ALL];
#endif
AV1RowMTSync row_mt_sync;
+ AV1RowMTInfo row_mt_info;
} TileDataEnc;
typedef struct {
@@ -509,6 +515,7 @@
int allocated_tile_rows;
int allocated_tile_cols;
int allocated_sb_rows;
+ int thread_id_to_tile_id[MAX_NUM_THREADS]; // Mapping of threads to tiles
} MultiThreadHandle;
typedef struct RD_COUNTS {
@@ -821,6 +828,9 @@
MultiThreadHandle multi_thread_ctxt;
void (*row_mt_sync_read_ptr)(AV1RowMTSync *const, int, int);
void (*row_mt_sync_write_ptr)(AV1RowMTSync *const, int, int, const int);
+#if CONFIG_MULTITHREAD
+ pthread_mutex_t *row_mt_mutex_;
+#endif
} AV1_COMP;
// Must not be called more than once.
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c
index 89f75b0..ab4687a 100644
--- a/av1/encoder/ethread.c
+++ b/av1/encoder/ethread.c
@@ -160,23 +160,118 @@
}
}
+void assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt, int num_tiles,
+ int num_workers) {
+ int tile_id = 0;
+ int i;
+
+ for (i = 0; i < num_workers; i++) {
+ multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++;
+ if (tile_id == num_tiles) tile_id = 0;
+ }
+}
+
+static int get_next_job(AV1_COMP *const cpi, int *current_mi_row,
+ int cur_tile_id) {
+ AV1_COMMON *const cm = &cpi->common;
+ TileDataEnc *const this_tile = &cpi->tile_data[cur_tile_id];
+ AV1RowMTInfo *row_mt_info = &this_tile->row_mt_info;
+
+ if (row_mt_info->current_mi_row < this_tile->tile_info.mi_row_end) {
+ *current_mi_row = row_mt_info->current_mi_row;
+ row_mt_info->num_threads_working++;
+ row_mt_info->current_mi_row += cm->seq_params.mib_size;
+ return 1;
+ }
+ return 0;
+}
+
+static void switch_tile_and_get_next_job(AV1_COMP *const cpi, int *cur_tile_id,
+ int *current_mi_row,
+ int *end_of_frame) {
+ AV1_COMMON *const cm = &cpi->common;
+ const int tile_cols = cm->tile_cols;
+ const int tile_rows = cm->tile_rows;
+
+ int tile_id = -1; // Stores the tile ID with minimum proc done
+ int max_mis_to_encode = 0;
+ int min_num_threads_working = INT_MAX;
+
+ for (int tile_row = 0; tile_row < tile_rows; tile_row++) {
+ for (int tile_col = 0; tile_col < tile_cols; tile_col++) {
+ int tile_index = tile_row * tile_cols + tile_col;
+ TileDataEnc *this_tile = &cpi->tile_data[tile_index];
+ AV1RowMTInfo *row_mt_info = &this_tile->row_mt_info;
+ int num_mis_to_encode =
+ this_tile->tile_info.mi_row_end - row_mt_info->current_mi_row;
+
+ // Tile to be processed by this thread is selected on the basis of
+ // availability of jobs:
+ // 1) If jobs are available, tile to be processed is chosen on the
+ // basis of minimum number of threads working for that tile. If two or
+ // more tiles have same number of threads working for them, then the tile
+ // with maximum number of jobs available will be chosen.
+ // 2) If no jobs are available, then end_of_frame is reached.
+ if (num_mis_to_encode > 0) {
+ int num_threads_working = row_mt_info->num_threads_working;
+ if (num_threads_working < min_num_threads_working) {
+ min_num_threads_working = num_threads_working;
+ max_mis_to_encode = 0;
+ }
+ if (num_threads_working == min_num_threads_working &&
+ num_mis_to_encode > max_mis_to_encode) {
+ tile_id = tile_index;
+ max_mis_to_encode = num_mis_to_encode;
+ }
+ }
+ }
+ }
+ if (tile_id == -1) {
+ *end_of_frame = 1;
+ } else {
+ // Update the cur ID to the next tile ID that will be processed,
+ // which will be the least processed tile
+ *cur_tile_id = tile_id;
+ get_next_job(cpi, current_mi_row, *cur_tile_id);
+ }
+}
+
static int enc_row_mt_worker_hook(void *arg1, void *unused) {
EncWorkerData *const thread_data = (EncWorkerData *)arg1;
AV1_COMP *const cpi = thread_data->cpi;
AV1_COMMON *const cm = &cpi->common;
- const int tile_cols = cm->tile_cols;
- const int tile_rows = cm->tile_rows;
- int t;
+ MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
+ int thread_id = thread_data->thread_id;
+ int cur_tile_id = multi_thread_ctxt->thread_id_to_tile_id[thread_id];
(void)unused;
- for (t = thread_data->start; t < tile_rows * tile_cols;
- t += cpi->num_workers) {
- int tile_row = t / tile_cols;
- int tile_col = t % tile_cols;
+ assert(cur_tile_id != -1);
- TileDataEnc *const this_tile =
- &cpi->tile_data[tile_row * cm->tile_cols + tile_col];
+ int end_of_frame = 0;
+ while (1) {
+ int current_mi_row = -1;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(cpi->row_mt_mutex_);
+#endif
+ if (!get_next_job(cpi, ¤t_mi_row, cur_tile_id)) {
+ // No jobs are available for the current tile. Query for the status of
+ // other tiles and get the next job if available
+ switch_tile_and_get_next_job(cpi, &cur_tile_id, ¤t_mi_row,
+ &end_of_frame);
+ }
+#if CONFIG_MULTITHREAD
+ pthread_mutex_unlock(cpi->row_mt_mutex_);
+#endif
+ if (end_of_frame == 1) break;
+
+ TileDataEnc *const this_tile = &cpi->tile_data[cur_tile_id];
+ int tile_row = this_tile->tile_info.tile_row;
+ int tile_col = this_tile->tile_info.tile_col;
+
+ assert(current_mi_row != -1 &&
+ current_mi_row <= this_tile->tile_info.mi_row_end);
+
ThreadData *td = thread_data->td;
td->mb.e_mbd.tile_ctx = td->tctx;
@@ -193,11 +288,14 @@
av1_crc32c_calculator_init(&td->mb.mb_rd_record.crc_calculator);
td->intrabc_used_this_tile = 0;
- for (int mi_row = this_tile->tile_info.mi_row_start;
- mi_row < this_tile->tile_info.mi_row_end;
- mi_row += cm->seq_params.mib_size) {
- av1_encode_sb_row(cpi, td, tile_row, tile_col, mi_row);
- }
+ av1_encode_sb_row(cpi, td, tile_row, tile_col, current_mi_row);
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(cpi->row_mt_mutex_);
+#endif
+ this_tile->row_mt_info.num_threads_working--;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_unlock(cpi->row_mt_mutex_);
+#endif
}
return 1;
@@ -239,6 +337,16 @@
CHECK_MEM_ERROR(cm, cpi->tile_thr_data,
aom_calloc(num_workers, sizeof(*cpi->tile_thr_data)));
+#if CONFIG_MULTITHREAD
+ if (cpi->row_mt == 1) {
+ if (cpi->row_mt_mutex_ == NULL) {
+ CHECK_MEM_ERROR(cm, cpi->row_mt_mutex_,
+ aom_malloc(sizeof(*(cpi->row_mt_mutex_))));
+ if (cpi->row_mt_mutex_) pthread_mutex_init(cpi->row_mt_mutex_, NULL);
+ }
+ }
+#endif
+
for (int i = 0; i < num_workers; i++) {
AVxWorker *const worker = &cpi->workers[i];
EncWorkerData *const thread_data = &cpi->tile_thr_data[i];
@@ -248,6 +356,7 @@
worker->thread_name = "aom enc worker";
thread_data->cpi = cpi;
+ thread_data->thread_id = i;
if (i < num_workers - 1) {
// Allocate thread data.
@@ -460,7 +569,7 @@
const int tile_cols = cm->tile_cols;
const int tile_rows = cm->tile_rows;
MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
- int num_workers = AOMMIN(cpi->oxcf.max_threads, tile_cols * tile_rows);
+ int num_workers = cpi->oxcf.max_threads;
int max_sb_rows = 0;
if (cpi->tile_data == NULL || cpi->allocated_tiles < tile_cols * tile_rows) {
@@ -485,13 +594,20 @@
av1_row_mt_mem_alloc(cpi, max_sb_rows);
}
+ memset(multi_thread_ctxt->thread_id_to_tile_id, -1,
+ sizeof(*multi_thread_ctxt->thread_id_to_tile_id) * MAX_NUM_THREADS);
+
for (int tile_row = 0; tile_row < tile_rows; tile_row++) {
for (int tile_col = 0; tile_col < tile_cols; tile_col++) {
- TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col];
+ int tile_id = tile_row * tile_cols + tile_col;
+ TileDataEnc *this_tile = &cpi->tile_data[tile_id];
// Initialize cur_col to -1 for all rows.
memset(this_tile->row_mt_sync.cur_col, -1,
sizeof(*this_tile->row_mt_sync.cur_col) * max_sb_rows);
+ this_tile->row_mt_info.current_mi_row = this_tile->tile_info.mi_row_start;
+ this_tile->row_mt_info.num_threads_working = 0;
+
#if CONFIG_COLLECT_INTER_MODE_RD_STATS
av1_inter_mode_data_init(this_tile);
#endif
@@ -509,6 +625,7 @@
} else {
num_workers = AOMMIN(num_workers, cpi->num_workers);
}
+ assign_tile_to_thread(multi_thread_ctxt, tile_cols * tile_rows, num_workers);
prepare_enc_workers(cpi, enc_row_mt_worker_hook, num_workers);
launch_enc_workers(cpi, num_workers);
sync_enc_workers(cpi, num_workers);
diff --git a/av1/encoder/ethread.h b/av1/encoder/ethread.h
index 853acb1..1830759 100644
--- a/av1/encoder/ethread.h
+++ b/av1/encoder/ethread.h
@@ -24,6 +24,7 @@
struct AV1_COMP *cpi;
struct ThreadData *td;
int start;
+ int thread_id;
} EncWorkerData;
void av1_row_mt_sync_read(AV1RowMTSync *const row_mt_sync, int r, int c);