Implement row based multi-threading of encoder Row based multi-threading of encoder has been implemented. When tested for 20 frames of BasketballDrive_1920x1080_50 content, with --cpu-used=1 --tile-columns=2 --tile-rows=2, the following scaling was acheived for 4 threads: Tile-based: Row based: 2.566 3.331 ~23% encode time reduction was seen for row based multi-threading of encoder when compared with tile-based multi-threading for the above configuration. Change-Id: I1e3d431ffcd3d0d398fe6c3aa34755e4d8c5332d
diff --git a/av1/encoder/encoder.c b/av1/encoder/encoder.c index e94b832..53c60a7 100644 --- a/av1/encoder/encoder.c +++ b/av1/encoder/encoder.c
@@ -3058,6 +3058,14 @@ aom_free(thread_data->td); } } +#if CONFIG_MULTITHREAD + if (cpi->row_mt == 1) { + if (cpi->row_mt_mutex_ != NULL) { + pthread_mutex_destroy(cpi->row_mt_mutex_); + aom_free(cpi->row_mt_mutex_); + } + } +#endif av1_row_mt_mem_dealloc(cpi); aom_free(cpi->tile_thr_data); aom_free(cpi->workers);
diff --git a/av1/encoder/encoder.h b/av1/encoder/encoder.h index f58a4a1..682b424 100644 --- a/av1/encoder/encoder.h +++ b/av1/encoder/encoder.h
@@ -482,6 +482,11 @@ int rows; } AV1RowMTSync; +typedef struct AV1RowMTInfo { + int current_mi_row; + int num_threads_working; +} AV1RowMTInfo; + // TODO(jingning) All spatially adaptive variables should go to TileDataEnc. typedef struct TileDataEnc { TileInfo tile_info; @@ -497,6 +502,7 @@ InterModeRdModel inter_mode_rd_models[BLOCK_SIZES_ALL]; #endif AV1RowMTSync row_mt_sync; + AV1RowMTInfo row_mt_info; } TileDataEnc; typedef struct { @@ -509,6 +515,7 @@ int allocated_tile_rows; int allocated_tile_cols; int allocated_sb_rows; + int thread_id_to_tile_id[MAX_NUM_THREADS]; // Mapping of threads to tiles } MultiThreadHandle; typedef struct RD_COUNTS { @@ -821,6 +828,9 @@ MultiThreadHandle multi_thread_ctxt; void (*row_mt_sync_read_ptr)(AV1RowMTSync *const, int, int); void (*row_mt_sync_write_ptr)(AV1RowMTSync *const, int, int, const int); +#if CONFIG_MULTITHREAD + pthread_mutex_t *row_mt_mutex_; +#endif } AV1_COMP; // Must not be called more than once.
diff --git a/av1/encoder/ethread.c b/av1/encoder/ethread.c index 89f75b0..ab4687a 100644 --- a/av1/encoder/ethread.c +++ b/av1/encoder/ethread.c
@@ -160,23 +160,118 @@ } } +void assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt, int num_tiles, + int num_workers) { + int tile_id = 0; + int i; + + for (i = 0; i < num_workers; i++) { + multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++; + if (tile_id == num_tiles) tile_id = 0; + } +} + +static int get_next_job(AV1_COMP *const cpi, int *current_mi_row, + int cur_tile_id) { + AV1_COMMON *const cm = &cpi->common; + TileDataEnc *const this_tile = &cpi->tile_data[cur_tile_id]; + AV1RowMTInfo *row_mt_info = &this_tile->row_mt_info; + + if (row_mt_info->current_mi_row < this_tile->tile_info.mi_row_end) { + *current_mi_row = row_mt_info->current_mi_row; + row_mt_info->num_threads_working++; + row_mt_info->current_mi_row += cm->seq_params.mib_size; + return 1; + } + return 0; +} + +static void switch_tile_and_get_next_job(AV1_COMP *const cpi, int *cur_tile_id, + int *current_mi_row, + int *end_of_frame) { + AV1_COMMON *const cm = &cpi->common; + const int tile_cols = cm->tile_cols; + const int tile_rows = cm->tile_rows; + + int tile_id = -1; // Stores the tile ID with minimum proc done + int max_mis_to_encode = 0; + int min_num_threads_working = INT_MAX; + + for (int tile_row = 0; tile_row < tile_rows; tile_row++) { + for (int tile_col = 0; tile_col < tile_cols; tile_col++) { + int tile_index = tile_row * tile_cols + tile_col; + TileDataEnc *this_tile = &cpi->tile_data[tile_index]; + AV1RowMTInfo *row_mt_info = &this_tile->row_mt_info; + int num_mis_to_encode = + this_tile->tile_info.mi_row_end - row_mt_info->current_mi_row; + + // Tile to be processed by this thread is selected on the basis of + // availability of jobs: + // 1) If jobs are available, tile to be processed is chosen on the + // basis of minimum number of threads working for that tile. If two or + // more tiles have same number of threads working for them, then the tile + // with maximum number of jobs available will be chosen. + // 2) If no jobs are available, then end_of_frame is reached. + if (num_mis_to_encode > 0) { + int num_threads_working = row_mt_info->num_threads_working; + if (num_threads_working < min_num_threads_working) { + min_num_threads_working = num_threads_working; + max_mis_to_encode = 0; + } + if (num_threads_working == min_num_threads_working && + num_mis_to_encode > max_mis_to_encode) { + tile_id = tile_index; + max_mis_to_encode = num_mis_to_encode; + } + } + } + } + if (tile_id == -1) { + *end_of_frame = 1; + } else { + // Update the cur ID to the next tile ID that will be processed, + // which will be the least processed tile + *cur_tile_id = tile_id; + get_next_job(cpi, current_mi_row, *cur_tile_id); + } +} + static int enc_row_mt_worker_hook(void *arg1, void *unused) { EncWorkerData *const thread_data = (EncWorkerData *)arg1; AV1_COMP *const cpi = thread_data->cpi; AV1_COMMON *const cm = &cpi->common; - const int tile_cols = cm->tile_cols; - const int tile_rows = cm->tile_rows; - int t; + MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt; + int thread_id = thread_data->thread_id; + int cur_tile_id = multi_thread_ctxt->thread_id_to_tile_id[thread_id]; (void)unused; - for (t = thread_data->start; t < tile_rows * tile_cols; - t += cpi->num_workers) { - int tile_row = t / tile_cols; - int tile_col = t % tile_cols; + assert(cur_tile_id != -1); - TileDataEnc *const this_tile = - &cpi->tile_data[tile_row * cm->tile_cols + tile_col]; + int end_of_frame = 0; + while (1) { + int current_mi_row = -1; +#if CONFIG_MULTITHREAD + pthread_mutex_lock(cpi->row_mt_mutex_); +#endif + if (!get_next_job(cpi, ¤t_mi_row, cur_tile_id)) { + // No jobs are available for the current tile. Query for the status of + // other tiles and get the next job if available + switch_tile_and_get_next_job(cpi, &cur_tile_id, ¤t_mi_row, + &end_of_frame); + } +#if CONFIG_MULTITHREAD + pthread_mutex_unlock(cpi->row_mt_mutex_); +#endif + if (end_of_frame == 1) break; + + TileDataEnc *const this_tile = &cpi->tile_data[cur_tile_id]; + int tile_row = this_tile->tile_info.tile_row; + int tile_col = this_tile->tile_info.tile_col; + + assert(current_mi_row != -1 && + current_mi_row <= this_tile->tile_info.mi_row_end); + ThreadData *td = thread_data->td; td->mb.e_mbd.tile_ctx = td->tctx; @@ -193,11 +288,14 @@ av1_crc32c_calculator_init(&td->mb.mb_rd_record.crc_calculator); td->intrabc_used_this_tile = 0; - for (int mi_row = this_tile->tile_info.mi_row_start; - mi_row < this_tile->tile_info.mi_row_end; - mi_row += cm->seq_params.mib_size) { - av1_encode_sb_row(cpi, td, tile_row, tile_col, mi_row); - } + av1_encode_sb_row(cpi, td, tile_row, tile_col, current_mi_row); +#if CONFIG_MULTITHREAD + pthread_mutex_lock(cpi->row_mt_mutex_); +#endif + this_tile->row_mt_info.num_threads_working--; +#if CONFIG_MULTITHREAD + pthread_mutex_unlock(cpi->row_mt_mutex_); +#endif } return 1; @@ -239,6 +337,16 @@ CHECK_MEM_ERROR(cm, cpi->tile_thr_data, aom_calloc(num_workers, sizeof(*cpi->tile_thr_data))); +#if CONFIG_MULTITHREAD + if (cpi->row_mt == 1) { + if (cpi->row_mt_mutex_ == NULL) { + CHECK_MEM_ERROR(cm, cpi->row_mt_mutex_, + aom_malloc(sizeof(*(cpi->row_mt_mutex_)))); + if (cpi->row_mt_mutex_) pthread_mutex_init(cpi->row_mt_mutex_, NULL); + } + } +#endif + for (int i = 0; i < num_workers; i++) { AVxWorker *const worker = &cpi->workers[i]; EncWorkerData *const thread_data = &cpi->tile_thr_data[i]; @@ -248,6 +356,7 @@ worker->thread_name = "aom enc worker"; thread_data->cpi = cpi; + thread_data->thread_id = i; if (i < num_workers - 1) { // Allocate thread data. @@ -460,7 +569,7 @@ const int tile_cols = cm->tile_cols; const int tile_rows = cm->tile_rows; MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt; - int num_workers = AOMMIN(cpi->oxcf.max_threads, tile_cols * tile_rows); + int num_workers = cpi->oxcf.max_threads; int max_sb_rows = 0; if (cpi->tile_data == NULL || cpi->allocated_tiles < tile_cols * tile_rows) { @@ -485,13 +594,20 @@ av1_row_mt_mem_alloc(cpi, max_sb_rows); } + memset(multi_thread_ctxt->thread_id_to_tile_id, -1, + sizeof(*multi_thread_ctxt->thread_id_to_tile_id) * MAX_NUM_THREADS); + for (int tile_row = 0; tile_row < tile_rows; tile_row++) { for (int tile_col = 0; tile_col < tile_cols; tile_col++) { - TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col]; + int tile_id = tile_row * tile_cols + tile_col; + TileDataEnc *this_tile = &cpi->tile_data[tile_id]; // Initialize cur_col to -1 for all rows. memset(this_tile->row_mt_sync.cur_col, -1, sizeof(*this_tile->row_mt_sync.cur_col) * max_sb_rows); + this_tile->row_mt_info.current_mi_row = this_tile->tile_info.mi_row_start; + this_tile->row_mt_info.num_threads_working = 0; + #if CONFIG_COLLECT_INTER_MODE_RD_STATS av1_inter_mode_data_init(this_tile); #endif @@ -509,6 +625,7 @@ } else { num_workers = AOMMIN(num_workers, cpi->num_workers); } + assign_tile_to_thread(multi_thread_ctxt, tile_cols * tile_rows, num_workers); prepare_enc_workers(cpi, enc_row_mt_worker_hook, num_workers); launch_enc_workers(cpi, num_workers); sync_enc_workers(cpi, num_workers);
diff --git a/av1/encoder/ethread.h b/av1/encoder/ethread.h index 853acb1..1830759 100644 --- a/av1/encoder/ethread.h +++ b/av1/encoder/ethread.h
@@ -24,6 +24,7 @@ struct AV1_COMP *cpi; struct ThreadData *td; int start; + int thread_id; } EncWorkerData; void av1_row_mt_sync_read(AV1RowMTSync *const row_mt_sync, int r, int c);