|  | """ | 
|  | Copyright (c) 2024, Alliance for Open Media. All rights reserved | 
|  |  | 
|  | This source code is subject to the terms of the BSD 3-Clause Clear License | 
|  | and the Alliance for Open Media Patent License 1.0. If the BSD 3-Clause Clear | 
|  | License was not distributed with this source code in the LICENSE file, you | 
|  | can obtain it at aomedia.org/license/software-license/bsd-3-c-c/.  If the | 
|  | Alliance for Open Media Patent License 1.0 was not distributed with this | 
|  | source code in the PATENTS file, you can obtain it at | 
|  | aomedia.org/license/patent-license/. | 
|  | """ | 
|  | import json | 
|  | import os | 
|  |  | 
|  | import numpy as np | 
|  | import pandas as pd | 
|  |  | 
|  | import parakit.entropy.model as model | 
|  | from parakit.entropy.codec_cdf_functions import ( | 
|  | cdf2pmf_av1, | 
|  | cdfinv2pmf_av1, | 
|  | cost_symbol_av1, | 
|  | pmf2cdfinv_av1, | 
|  | update_cdfinv_av1, | 
|  | ) | 
|  | from parakit.entropy.data_collector import DataCollector | 
|  |  | 
|  | COST_REGULARIZATION = 4  # modifies maximum cost | 
|  | MIN_SAMPLE_REQUIRED = 10  # minimum number of data points needed to run training | 
|  | MAX_NUMBER_ROWS = 5000000 | 
|  |  | 
|  |  | 
|  | class Trainer: | 
|  | def __init__( | 
|  | self, | 
|  | filename_data, | 
|  | rate_search_list, | 
|  | cost_regularization=COST_REGULARIZATION, | 
|  | min_sample=MIN_SAMPLE_REQUIRED, | 
|  | max_rows=MAX_NUMBER_ROWS, | 
|  | ): | 
|  | # training parameters | 
|  | self._filename_data = filename_data  # data file | 
|  | self._cost_regularization = cost_regularization | 
|  | self._min_sample = min_sample | 
|  | self._max_rows = max_rows | 
|  | self._rate_search_list = rate_search_list | 
|  |  | 
|  | # internal data and models | 
|  | self.dataframe = None | 
|  | self.initial_model = None | 
|  | self.resulting_model = None | 
|  | self._max_cost_regularization = None | 
|  | self._output_filename = None  # resulting json file | 
|  |  | 
|  | self.initialize() | 
|  |  | 
|  | def initialize(self): | 
|  | dc = DataCollector(self._filename_data) | 
|  | self._initial_model = dc.get_context_model() | 
|  | self.dataframe = dc.collect_dataframe(max_rows=self._max_rows) | 
|  | self._output_filename = self._get_output_filename() | 
|  | self._max_cost_regularization = cost_symbol_av1(self._cost_regularization) | 
|  |  | 
|  | def prepare_dataframe(self, df, ctx_idx_list): | 
|  | dims = self._initial_model.num_dims | 
|  | df_filtered = df | 
|  | if dims > 0: | 
|  | mask = df["Dim0"] == ctx_idx_list[0] | 
|  | for i in range(1, dims): | 
|  | mask &= df[f"Dim{i}"] == ctx_idx_list[i] | 
|  | df_filtered = df.loc[mask] | 
|  | # add "CalcCost" column calculating cost | 
|  | new_cols = ["CalcCost"] | 
|  | df_filtered = df_filtered.reindex( | 
|  | columns=[*df_filtered.columns.tolist(), *new_cols], fill_value=0 | 
|  | ) | 
|  | df_filtered.reset_index(inplace=True) | 
|  |  | 
|  | # return empty df if very few (< _min_sample) data points available | 
|  | if len(df_filtered) < self._min_sample: | 
|  | return pd.DataFrame() | 
|  |  | 
|  | # convert NaN values to -1, if any | 
|  | df_filtered = df_filtered.fillna(-1) | 
|  | df_filtered = df_filtered[df_filtered.columns].astype( | 
|  | int | 
|  | )  # all columns are integer | 
|  |  | 
|  | return df_filtered | 
|  |  | 
|  | def map_rate_from_counter(self, rates_tuple, counter): | 
|  | if counter <= 15: | 
|  | return rates_tuple[0] | 
|  | elif counter <= 31: | 
|  | return rates_tuple[1] | 
|  | else: | 
|  | return rates_tuple[2] | 
|  |  | 
|  | def update_pmf_per_rate(self, pmf_list, cost_list, val, counter): | 
|  | rate_offset_list = self._rate_search_list | 
|  | nsymb = self._initial_model.num_symb | 
|  | for r, rates_tuple in enumerate(rate_offset_list): | 
|  | roffset = self.map_rate_from_counter(rates_tuple, counter) | 
|  | # cost | 
|  | cost = cost_symbol_av1(pmf_list[r][val]) | 
|  | if cost > self._max_cost_regularization: | 
|  | cost = self._max_cost_regularization | 
|  | cost_list[r] += cost | 
|  | # update | 
|  | cdf_inv = pmf2cdfinv_av1(pmf_list[r]) | 
|  | cdf_inv_updated = update_cdfinv_av1(cdf_inv, val, counter, nsymb, roffset) | 
|  | pmf_list[r] = cdfinv2pmf_av1(cdf_inv_updated) | 
|  | return (pmf_list, cost_list) | 
|  |  | 
|  | def estimate_cost_multiple(self, df_filtered, pmf_init, rateidx_init=0): | 
|  | nsymb = self._initial_model.num_symb | 
|  | rate_offset_list = self._rate_search_list | 
|  | num_rates = len(rate_offset_list) | 
|  | cost_list = np.zeros(num_rates, dtype=int) | 
|  |  | 
|  | # initialization | 
|  | pmf_init_list = [pmf_init] * num_rates | 
|  | prev_pmf_buffer = [pmf_init_list] | 
|  | cdf_cols = [] | 
|  | for i in range(nsymb): | 
|  | if f"cdf{i}" in df_filtered.keys(): | 
|  | cdf_cols.append(f"cdf{i}") | 
|  |  | 
|  | mask_index = (df_filtered["Counter"] == 0) | (df_filtered["isBeginFrame"] == 1) | 
|  | index_list = df_filtered[mask_index].index.to_list() | 
|  | index_list.append(len(df_filtered)) | 
|  | index_pair_list = [ | 
|  | (index_list[ind], index_list[ind + 1]) for ind in range(len(index_list) - 1) | 
|  | ] | 
|  |  | 
|  | for index_pair in index_pair_list: | 
|  | start_idx, end_idx = index_pair | 
|  | df_sub = df_filtered.iloc[start_idx:end_idx].copy() | 
|  |  | 
|  | num_data_samples = len(df_sub) | 
|  | success = False | 
|  | if len(cdf_cols) == nsymb: | 
|  | cdf_init = np.array(df_sub.iloc[0][cdf_cols], dtype=int) | 
|  | prev_pmf_buffer = [[cdf2pmf_av1(cdf_init)] * num_rates] | 
|  | for _, prev_pmf in enumerate(prev_pmf_buffer): | 
|  | rateidx_init = df_sub.iloc[0]["rate"] | 
|  | pmf_default = prev_pmf[rateidx_init]  # default one | 
|  | # initial values | 
|  | val_init = df_sub.iloc[0]["Value"] | 
|  | cost = df_sub.iloc[0]["Cost"] | 
|  | calculated_cost = cost_symbol_av1(pmf_default[val_init]) | 
|  | if calculated_cost > self._max_cost_regularization: | 
|  | calculated_cost = self._max_cost_regularization | 
|  |  | 
|  | cost_list_persub = np.zeros(num_rates, dtype=int) | 
|  |  | 
|  | if calculated_cost != cost: | 
|  | continue | 
|  |  | 
|  | pmf_list = [pmf_default] * num_rates | 
|  |  | 
|  | # try best cases | 
|  | for i in range(num_data_samples): | 
|  | val = df_sub.iloc[i]["Value"] | 
|  | counter = df_sub.iloc[i]["Counter"] | 
|  | cost = df_sub.iloc[i]["Cost"] | 
|  | calculated_cost = cost_symbol_av1(pmf_list[rateidx_init][val]) | 
|  | if calculated_cost > self._max_cost_regularization: | 
|  | calculated_cost = self._max_cost_regularization | 
|  | df_sub.iloc[i]["CalcCost"] = calculated_cost | 
|  |  | 
|  | if cost != calculated_cost: | 
|  | break | 
|  |  | 
|  | # update variables | 
|  | pmf_list, cost_list_persub = self.update_pmf_per_rate( | 
|  | pmf_list, cost_list_persub, val, counter | 
|  | ) | 
|  |  | 
|  | # success | 
|  | if i == num_data_samples - 1: | 
|  | success = True | 
|  | # update cost | 
|  | cost_list += cost_list_persub | 
|  | # save latest pmf before updating | 
|  | prev_pmf_buffer.insert(1, pmf_list) | 
|  | if success: | 
|  | break | 
|  |  | 
|  | return (cost_list, 0) | 
|  |  | 
|  | def get_cost_per_value(self, df): | 
|  | num_symb = self._initial_model.num_symb | 
|  | cost_pervalue_list = [0] * num_symb | 
|  | if df.empty: | 
|  | return cost_pervalue_list | 
|  | for i in range(num_symb): | 
|  | cost = df[df["Value"] == i]["Cost"].sum() | 
|  | cost_pervalue_list[i] = int(cost) | 
|  | return cost_pervalue_list | 
|  |  | 
|  | def get_value_count(self, df): | 
|  | num_symb = self._initial_model.num_symb | 
|  | value_count_list = [0] * num_symb | 
|  | if df.empty: | 
|  | return value_count_list | 
|  | count_series = df["Value"].value_counts() | 
|  | for i in count_series.index: | 
|  | count = count_series[i] | 
|  | value_count_list[i] = int(count) | 
|  | return value_count_list | 
|  |  | 
|  | def run_rate_training_on_file(self): | 
|  | # parameters | 
|  | rate_parameters = self._rate_search_list | 
|  | size_list = self._initial_model.size_list | 
|  | num_dims = self._initial_model.num_dims | 
|  | num_symb = self._initial_model.num_symb | 
|  | ctx_group_name = self._initial_model.ctx_group_name | 
|  | prob_dict = self._initial_model.model_dict | 
|  |  | 
|  | ctx_name_list = list(prob_dict.keys()) | 
|  | result_dict = prob_dict.copy() | 
|  | for ctx_name in ctx_name_list: | 
|  | ctx_idx_interest = prob_dict[ctx_name]["index"] | 
|  | print(f"Context {ctx_name}:", end=" ") | 
|  | df_filtered = self.prepare_dataframe(self.dataframe, ctx_idx_interest) | 
|  | value_count_list = self.get_value_count(df_filtered) | 
|  | value_cost_list = self.get_cost_per_value(df_filtered) | 
|  | # check empty df | 
|  | if df_filtered.empty: | 
|  | print("Skip training - insufficient data.") | 
|  | result_dict[ctx_name] = { | 
|  | "current_cost": np.zeros(len(rate_parameters), dtype=int), | 
|  | "adapt_rate": rate_parameters, | 
|  | "initial_cost": 0, | 
|  | "codec_cost": 0, | 
|  | "upper_cost": 0, | 
|  | "num_samples": 0, | 
|  | "initializer": prob_dict[ctx_name]["initializer"], | 
|  | "init_rateidx": prob_dict[ctx_name]["init_rateidx"], | 
|  | "value_count": value_count_list, | 
|  | "value_cost": value_cost_list, | 
|  | } | 
|  | continue | 
|  |  | 
|  | # actual cost | 
|  | codec_cost = df_filtered["Cost"].sum() | 
|  |  | 
|  | # estimate cost | 
|  | default_cdf = np.array(prob_dict[ctx_name]["initializer"]) | 
|  | default_pmf = cdf2pmf_av1(default_cdf) | 
|  | default_rateidx = int(prob_dict[ctx_name]["init_rateidx"]) | 
|  | # default_rate = RATE_LIST[default_rateidx] | 
|  |  | 
|  | curr_cost_list, upper_cost = self.estimate_cost_multiple( | 
|  | df_filtered, pmf_init=default_pmf, rateidx_init=default_rateidx | 
|  | ) | 
|  | init_cost = curr_cost_list[default_rateidx] | 
|  | curr_cost_list = curr_cost_list - init_cost | 
|  | result_dict[ctx_name] = { | 
|  | "current_cost": curr_cost_list, | 
|  | "adapt_rate": rate_parameters, | 
|  | "initial_cost": init_cost, | 
|  | "codec_cost": codec_cost, | 
|  | "upper_cost": upper_cost, | 
|  | "num_samples": len(df_filtered), | 
|  | "initializer": prob_dict[ctx_name]["initializer"], | 
|  | "init_rateidx": prob_dict[ctx_name]["init_rateidx"], | 
|  | "value_count": value_count_list, | 
|  | "value_cost": value_cost_list, | 
|  | } | 
|  | print("Training...") | 
|  |  | 
|  | self.resulting_model = model.EntropyContext( | 
|  | ctx_group_name, num_symb, num_dims, size_list, result_dict | 
|  | ) | 
|  | self.write_results() | 
|  |  | 
|  | return self.resulting_model | 
|  |  | 
|  | def _get_output_filename(self): | 
|  | # get path and filename | 
|  | fullpath_data = self._filename_data | 
|  | filename = os.path.basename(fullpath_data) | 
|  | dirname = os.path.dirname(fullpath_data) | 
|  | if len(dirname) == 0: | 
|  | dirname = "." | 
|  | base_filename = filename.split(".")[0] | 
|  | fullpath_result_json = dirname + "/" + "Result_" + base_filename + ".json" | 
|  | return fullpath_result_json | 
|  |  | 
|  | def get_searched_rate_parameter_list(self): | 
|  | return self._rate_search_list | 
|  |  | 
|  | def write_results(self): | 
|  | # information | 
|  | ctx_group_name = self.resulting_model.ctx_group_name | 
|  | num_symb = self.resulting_model.num_symb | 
|  | num_dims = self.resulting_model.num_dims | 
|  | size_list = self.resulting_model.size_list | 
|  | result_dict = self.resulting_model.model_dict | 
|  |  | 
|  | # collect keys | 
|  | key_list = ["information"] | 
|  | key_list.extend(list(result_dict.keys())) | 
|  | json_dict = dict.fromkeys(key_list) | 
|  | for key in key_list: | 
|  | if key == "information": | 
|  | json_dict[key] = { | 
|  | "header": { | 
|  | "ctx_group_name": ctx_group_name, | 
|  | "num_symb": num_symb, | 
|  | "num_dims": num_dims, | 
|  | "size_list": size_list, | 
|  | }, | 
|  | "training": { | 
|  | "rate_search_list": self._rate_search_list, | 
|  | "cost_regularization": self._cost_regularization, | 
|  | "min_sample": self._min_sample, | 
|  | }, | 
|  | } | 
|  | else: | 
|  | cost_list = result_dict[key]["current_cost"].copy() | 
|  | min_idx = np.argmin(cost_list) | 
|  | json_dict[key] = { | 
|  | "best_rate": self._rate_search_list[min_idx], | 
|  | "best_rate_idx": int(min_idx), | 
|  | "current_cost": cost_list.astype(int).tolist(), | 
|  | "initial_cost": int(result_dict[key]["initial_cost"]), | 
|  | "codec_cost": int(result_dict[key]["codec_cost"]), | 
|  | "upper_cost": int(result_dict[key]["upper_cost"]), | 
|  | "num_samples": int(result_dict[key]["num_samples"]), | 
|  | "initializer": result_dict[key]["initializer"], | 
|  | "init_rateidx": result_dict[key]["init_rateidx"], | 
|  | "value_count": result_dict[key]["value_count"], | 
|  | "value_cost": result_dict[key]["value_cost"], | 
|  | } | 
|  | # write to json | 
|  | with open(self._output_filename, "w") as jsonfile: | 
|  | json.dump(json_dict, jsonfile, indent=4) |