blob: 7aacdc9703b38b38a5a3da5800abbe35876a0a80 [file] [log] [blame] [edit]
"""
Copyright (c) 2024, Alliance for Open Media. All rights reserved
This source code is subject to the terms of the BSD 3-Clause Clear License
and the Alliance for Open Media Patent License 1.0. If the BSD 3-Clause Clear
License was not distributed with this source code in the LICENSE file, you
can obtain it at aomedia.org/license/software-license/bsd-3-c-c/. If the
Alliance for Open Media Patent License 1.0 was not distributed with this
source code in the PATENTS file, you can obtain it at
aomedia.org/license/patent-license/.
"""
import json
import numpy as np
from parakit.entropy.codec_cdf_functions import count2cdf_av1
from parakit.entropy.codec_default_cdf import CDF_INIT_TOP, av1_default_cdf_parameters
class ResultCollector:
def __init__(self, json_filename):
self.json_filename = json_filename
self._checkfile()
def _checkfile(self):
if not self.json_filename.endswith(".json"):
raise ValueError("File should have .json extension")
def _checkdata(self, data, key, isIgnored, default_rateidx=0):
data_cost = data[key]["current_cost"]
cost_default = int(data_cost[default_rateidx])
if cost_default != 0:
print(
f"Warning: Default cost is {cost_default} for {key} in file {self.json_filename}",
end=" -- ",
)
if isIgnored:
print("Ignored!")
return False
else:
print("Not ignored!")
return True
def parse_json_file(self, isIgnored=True):
filepath = self.json_filename
data = {}
with open(filepath) as json_file:
data = json.load(json_file)
keys = list(data.keys())
keys.remove("information")
for key in keys:
init_rateidx = data[key]["init_rateidx"]
if self._checkdata(data, key, isIgnored, default_rateidx=init_rateidx):
data[key]["current_cost"] = np.array(data[key]["current_cost"])
data[key]["value_count"] = np.array(data[key]["value_count"])
data[key]["value_cost"] = np.array(data[key]["value_cost"])
else:
data[key]["current_cost"] = np.zeros(
len(data[key]["current_cost"]), dtype=int
)
data[key]["value_count"] = np.zeros(
len(data[key]["value_count"]), dtype=int
)
data[key]["value_cost"] = np.zeros(
len(data[key]["value_cost"]), dtype=int
)
return data
def calculate_percent_reduction(self, data_in_key):
actual_cost = data_in_key["initial_cost"]
if actual_cost == 0:
percent_reduction = np.zeros(len(data_in_key["current_cost"]))
else:
percent_reduction = 100 * (data_in_key["current_cost"] / actual_cost)
percent_reduction = np.nan_to_num(percent_reduction)
return percent_reduction
def combine_data(self, combined_data, data):
keys = list(data.keys())
keys.remove("information") # skip information key
if len(combined_data) == 0:
combined_data = data.copy()
# one can add new fields below, if needed
for key in keys:
percent_reduction = self.calculate_percent_reduction(data[key])
combined_data[key][
"percent_cost_total"
] = percent_reduction # add percent reduction field
combined_data[key][
"percent_cost_min"
] = percent_reduction # add percent reduction field
combined_data[key][
"percent_cost_max"
] = percent_reduction # add percent reduction field
else:
for key in keys:
combined_data[key]["current_cost"] += data[key]["current_cost"]
combined_data[key]["num_samples"] += data[key]["num_samples"]
combined_data[key]["initial_cost"] += data[key]["initial_cost"]
combined_data[key]["codec_cost"] += data[key]["codec_cost"]
combined_data[key]["upper_cost"] += data[key]["upper_cost"]
combined_data[key]["value_count"] += data[key]["value_count"]
combined_data[key]["value_cost"] += data[key]["value_cost"]
# update percent reduction fields
percent_reduction = self.calculate_percent_reduction(data[key])
combined_data[key]["percent_cost_total"] += percent_reduction
combined_data[key]["percent_cost_min"] = np.minimum(
percent_reduction, combined_data[key]["percent_cost_min"]
)
combined_data[key]["percent_cost_max"] = np.maximum(
percent_reduction, combined_data[key]["percent_cost_max"]
)
return combined_data
def update_probability_initialzer(self, data):
keys = list(data.keys())
keys.remove("information") # skip information key
for key in keys:
value_count = data[key]["value_count"]
total_count = value_count.sum()
if total_count > 0:
pmf = value_count / total_count
cdf = np.cumsum(pmf)
scaled_cdf = np.round(CDF_INIT_TOP * cdf).astype(int)
if scaled_cdf[-1] > CDF_INIT_TOP:
scaled_cdf[-1] = CDF_INIT_TOP
data[key]["initializer"] = scaled_cdf.tolist()
return data
def update_probability_initialzer_av1_style(self, data):
keys = list(data.keys())
keys.remove("information") # skip information key
for key in keys:
value_count = data[key]["value_count"]
scaled_cdf = count2cdf_av1(value_count)
data[key]["initializer"] = scaled_cdf.tolist()
return data
def update_default_probability_initializer(self, data):
keys = list(data.keys())
keys.remove("information") # skip information key
for key in keys:
num_symb = data[key]["header"]["num_symb"]
cdf_list = av1_default_cdf_parameters(num_symb).tolist()
cdf_list.append(CDF_INIT_TOP)
data[key]["initializer"] = cdf_list
return data