Experiment¶
Evaluate one experiment described as a list of metrics and checks.
See Statistics for details about statistical method used
and Evaluation
for description of output.
Source code in epstats/toolkit/experiment.py
class Experiment:
"""
Evaluate one experiment described as a list of metrics and checks.
See [Statistics](../stats/basics.md) for details about statistical method used
and [`Evaluation`][epstats.toolkit.experiment.Evaluation] for description of output.
"""
def __init__(
self,
id: str,
control_variant: str,
metrics: List[Metric],
checks: List[Check],
unit_type: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
date_for: Optional[str] = None,
confidence_level: float = DEFAULT_CONFIDENCE_LEVEL,
variants: Optional[List[str]] = None,
filters: Optional[List[Filter]] = None,
query_parameters: dict = {},
):
self._logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self.id = id
self.control_variant = control_variant
self.unit_type = unit_type
self.metrics = metrics
self._check_metric_ids_unique()
self.checks = checks
self.date_from = datetime.strptime(date_from, "%Y-%m-%d").date() if date_from is not None else None
self.date_to = datetime.strptime(date_to, "%Y-%m-%d").date() if date_to is not None else None
self.date_for = (
datetime.strptime(date_for, "%Y-%m-%d").date() if date_for is not None else datetime.today().date()
)
self.confidence_level = confidence_level
self.variants = variants
self._exposure_goals = [
EpGoal(
[
"count",
"(",
UnitType([unit_type]),
".",
AggType(["global"]),
".",
Goal(["exposure"]),
")",
]
)
]
self._update_dimension_to_value()
self.filters = filters if filters is not None else []
self.query_parameters = query_parameters
def _check_metric_ids_unique(self):
"""
Raises an exception if `metrics` contain duplicated ids.
"""
id_counts = Counter(metric.id for metric in self.metrics)
for id_, count in id_counts.items():
if count > 1:
raise ValueError(f"Metric ids must be unique. Id={id_} found more than once.")
def _update_dimension_to_value(self):
"""
To every `EpGoal` across all metrics, we need to add missing dimensions
that are present in every other `EpGoal` instances so the row masking
can work properly.
"""
all_goals = []
for metric_or_check in self.metrics + self.checks:
for attr in metric_or_check.__dict__.values():
if isinstance(attr, Parser):
all_goals.append(attr._nominator_expr.get_goals())
all_goals.append(attr._denominator_expr.get_goals())
all_goals = chain(*all_goals, self._exposure_goals)
for goal in all_goals:
for dimension in self.get_dimension_columns():
if dimension not in goal.dimension_to_value:
goal.dimension_to_value[dimension] = ""
def evaluate_agg(self, goals: pd.DataFrame) -> Evaluation:
"""
Evaluate all metrics and checks in the experiment from already pre-aggregated goals.
This method is usefull when there are too many units in the experiment to evaluate it
using [`evaluate_by_unit`][epstats.toolkit.experiment.Experiment.evaluate_by_unit].
Does best effort to fill in missing goals and variants with zeros.
Arguments:
goals: dataframe with one row per goal and aggregated data in columns
`goals` dataframe columns:
1. `exp_id` - experiment id
1. `exp_variant_id` - variant
1. `unit_type` - randomization unit type
1. `agg_type` - level of aggregation
1. `goal` - goal name
1. any number of dimensional columns, e.g. column `product` containing values `p_1`
1. `count` - number of observed goals (e.g. conversions)
1. `sum_sqr_count` - summed squared number of observed goals per unit, it is similar
to `sum_sqr_value`
1. `sum_value` - value of observed goals
1. `sum_sqr_value` - summed squared value per unit. This is used to calculate
sample standard deviation from pre-aggregated data (it is a term $\sum x^2$
in $\hat{\sigma}^2 = \\frac{\sum x^2 - \\frac{(\sum x)^2}{n}}{n-1}$).
1. `count_unique` - number of units with at least 1 observed goal
Returns:
set of dataframes with evaluation
Usage:
```python
from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
'test-conversion',
'a',
[Metric(
1,
'Click-through Rate',
'count(test_unit_type.unit.click)',
'count(test_unit_type.global.exposure)'),
],
[SrmCheck(1, 'SRM', 'count(test_unit_type.global.exposure)')],
unit_type='test_unit_type')
# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_agg(experiment.id)
# evaluate experiment
ev = experiment.evaluate_agg(goals)
# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])
# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))
```
Input data frame example:
```
exp_id exp_variant_id unit_type agg_type goal product count sum_sqr_count sum_value sum_sqr_value count_unique
test-srm a test_unit_type global exposure 100000 100000 100000 100000 100000
test-srm b test_unit_type global exposure 100100 100100 100100 100100 100100
test-srm a test_unit_type unit conversion 1200 1800 32000 66528 900
test-srm a test_unit_type_2 global conversion product_1 1000 1700 31000 55000 850
```
"""
g = self._fix_missing_agg(goals)
return self._evaluate(
g,
Experiment._metrics_column_fce_agg,
Experiment._checks_fce_agg,
Experiment._exposures_fce_agg,
)
def evaluate_wide_agg(self, goals: pd.DataFrame) -> Evaluation:
"""
This is a simplified version of the method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].
It consumes simple input `goals` dataframe, transfers it into suitable dataframe format and evaluate it using general method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].
It assumes that the first two columns are name of the experiment and variants. Than follows columns with data.
See usage of the method in the notebook [Ad-hoc A/B test evaluation using Ep-Stats](../user_guide/ab_test_simple_evaluation.html).
Arguments:
goals: dataframe with one row per variant and aggregated data in columns
Possible `goals` dataframe columns (check the input dataframe example):
1. `exp_id` - experiment id
1. `exp_variant_id` - variant
1. `clicks` - sum of clicks
1. `views` - sum of views
1. `bookings` - sum of bookings
1. `bookings_squared` - sum of bookings squared
Returns:
set of dataframes with evaluation
Usage:
```python
from epstats.toolkit import Experiment, SimpleMetric, SimpleSrmCheck
from epstats.toolkit.results import results_long_to_wide, format_results
from epstats.toolkit.testing import TestData
# Load Test Data
goals = TestData.load_goals_simple_agg()
# Define the experiment
unit_type = 'test_unit_type'
experiment = Experiment(
'test-simple-metric',
'a',
[
SimpleMetric(1, 'Click-through Rate (CTR)', 'clicks', 'views', unit_type),
SimpleMetric(2, 'Conversion Rate', 'conversions', 'views', unit_type),
SimpleMetric(3, 'Revenue per Mille (RPM)', 'bookings', 'views', unit_type, metric_format='${:,.2f}', metric_value_multiplier=1000),
],
[SimpleSrmCheck(1, 'SRM', 'views')],
unit_type=unit_type)
# Evaluate the experiment
ev = experiment.evaluate_wide_agg(goals)
# Work with results
print(ev.exposures)
print(ev.metrics)
print(ev.checks)
# Possible formatting of metrics
ev.metrics.pipe(results_long_to_wide).pipe(format_results, experiment, format_pct='{:.1%}', format_pval='{:.3f}')
```
Input dataframe example:
```
experiment_id variant_id views clicks conversions bookings bookings_squared
my-exp a 473661 48194 413 17152 803105
my-exp b 471485 47184 360 14503 677178
my-exp c 477159 48841 406 15892 711661
my-exp d 474934 49090 289 11995 566700
```
"""
g = goals_wide_to_long(goals, self.unit_type)
return self.evaluate_agg(g)
def evaluate_by_unit(self, goals: pd.DataFrame) -> Evaluation:
"""
Evaluate all metrics and checks in the experiment from goals grouped by `unit_id`.
This method is usefull when there are not many (<1M) units in the experiment to evaluate it.
If there many units exposed to the experiment, pre-aggregate data and use [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].
Does best effort to fill in missing goals and variants with zeros.
Arguments:
goals: dataframe with one row per goal and aggregated data in columns
`goals` dataframe columns:
1. `exp_id` - experiment id
1. `exp_variant_id` - variant
1. `unit_type` - randomization unit type
1. `unit_id` - (randomization) unit id
1. `agg_type` - level of aggregation
1. `goal` - goal name
1. any number of dimensional columns, e.g. column `product` containing values `p_1`
1. `count` - number of observed goals
1. `sum_value` - value of observed goals
Returns:
set of dataframes with evaluation
Usage:
```python
from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
'test-real-valued',
'a',
[Metric(
2,
'Average Bookings',
'value(test_unit_type.unit.conversion)',
'count(test_unit_type.unit.exposure)')
],
[],
unit_type='test_unit_type')
# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_by_unit(experiment.id)
# evaluate experiment
ev = experiment.evaluate_by_unit(goals)
# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])
# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))
```
Input data frame example:
```
exp_id exp_variant_id unit_type unit_id agg_type goal product count sum_value
test-srm a test_unit_type test_unit_type_1 unit exposure 1 1
test-srm a test_unit_type test_unit_type_1 unit conversion product_1 2 75
test-srm b test_unit_type test_unit_type_2 unit exposure 1 1
test-srm b test_unit_type test_unit_type_3 unit exposure 1 1
test-srm b test_unit_type test_unit_type_3 unit conversion product_2 1 1
```
"""
g = self._fix_missing_by_unit(goals)
# We need to pivot table to get all goals per `unit_id` on the same row in the data frame.
# This is needed to be able to vector-evaluate compound metrics
# eg. `value(test_unit_type.unit.conversion) - value(test_unit_type.unit.refund)`
g = (
pd.pivot_table(
g,
values=["count", "sum_value"],
index=[
"exp_id",
"exp_variant_id",
"unit_type",
"agg_type",
"unit_id",
]
+ self.get_dimension_columns(),
columns="goal",
aggfunc=np.sum,
fill_value=0,
)
.swaplevel(axis=1)
.reset_index()
)
return self._evaluate(
g,
Experiment._metrics_column_fce_by_unit,
Experiment._checks_fce_by_unit,
Experiment._exposures_fce_by_unit,
)
def get_goals(self) -> List[EpGoal]:
"""
List of all goals needed to evaluate all metrics and checks in the experiment.
Returns:
list of parsed structured goals
"""
res = set()
for m in self.metrics:
res = res.union(m.get_goals())
for c in self.checks:
res = res.union(c.get_goals())
res = res.union(self._exposure_goals)
return list(res)
@staticmethod
def _metrics_column_fce_agg(m: Metric, goals: pd.DataFrame):
"""
Gets count, sum_value, sum_sqr_value columns by expression from already aggregated goals.
"""
return m.get_evaluate_columns_agg(goals)
@staticmethod
def _metrics_column_fce_by_unit(m: Metric, goals: pd.DataFrame):
"""
Gets count, sum_value, sum_sqr_value columns by expression from goals grouped by `unit_id`.
"""
return m.get_evaluate_columns_by_unit(goals)
@staticmethod
def _checks_fce_agg(c: Check, goals: pd.DataFrame, control_variant: str):
"""
Evaluates checks from already aggregated goals.
"""
return c.evaluate_agg(goals, control_variant)
@staticmethod
def _checks_fce_by_unit(c: Check, goals: pd.DataFrame, control_variant: str):
"""
Evaluates checks from goals grouped by `unit_id`.
"""
return c.evaluate_by_unit(goals, control_variant)
@staticmethod
def _exposures_fce_agg(goals: pd.DataFrame, exp_id: str, unit_type: str):
"""
Evaluates checks from already aggregated goals.
"""
df = (
goals[(goals["unit_type"] == unit_type) & (goals["agg_type"] == "global") & (goals["goal"] == "exposure")]
.groupby("exp_variant_id")
.agg(exposures=("count", "sum"))
.reset_index()
)
df["exp_id"] = exp_id
return df
@staticmethod
def _exposures_fce_by_unit(goals: pd.DataFrame, exp_id: str, unit_type: str):
"""
Evaluates checks from already aggregated goals.
"""
df = goals[(goals["unit_type"] == unit_type) & (goals["agg_type"] == "unit")][
[("exp_variant_id", ""), ("exposure", "count")]
]
df = df.droplevel(0, axis=1)
df.columns = ["exp_variant_id", "exposures"]
d = df.groupby("exp_variant_id").agg(exposures=("exposures", "sum")).reset_index()
d["exp_id"] = exp_id
return d
def _evaluate(self, goals: pd.DataFrame, metrics_column_fce, checks_fce, exposures_fce):
metrics = self._evaluate_metrics(goals, metrics_column_fce)
checks = self._evaluate_checks(goals, checks_fce)
exposures = self._evaluate_exposures(goals, exposures_fce)
return Evaluation(metrics, checks, exposures)
def _evaluate_exposures(self, goals: pd.DataFrame, exposures_fce) -> pd.DataFrame:
return exposures_fce(goals, self.id, self.unit_type)
def _evaluate_checks(self, goals: pd.DataFrame, check_fce) -> pd.DataFrame:
res = []
for c in self.checks:
try:
r = check_fce(c, goals, self.control_variant)
r["exp_id"] = self.id
res.append(r)
except Exception as e:
self._logger.warning(f"Cannot evaluate check [{c.id} in experiment [{self.id}] because of {e}")
check_evaluation_errors_metric.inc()
c = pd.concat(res, axis=0) if res != [] else pd.DataFrame([], columns=Evaluation.check_columns())
c["timestamp"] = round(get_utc_timestamp(datetime.now()).timestamp())
return c[Evaluation.check_columns()]
def get_dimension_columns(self) -> List[str]:
"""
Returns a list of all dimensions used in all metrics in the experiment.
"""
return list({d for g in self.get_goals() for d in g.dimension_to_value.keys()})
def _set_variants(self, goals):
# what variants and goals there should be from all the goals needed to evaluate all metrics
self.variants = (
self.variants
if self.variants is not None
else np.unique(np.append(goals["exp_variant_id"], self.control_variant))
)
def _fix_missing_agg(self, goals: pd.DataFrame) -> pd.DataFrame:
"""
Adds zero values for missing goals and variants that are needed for metric evaluation.
Does that in the best effort - fills in `count`, `sum_sqr_count`, `sum_value`, `sum_sqr_value` and `count_unique` with zeros.
"""
# what variants and goals there should be from all the goals needed to evaluate all metrics
self._set_variants(goals)
g = goals[goals.exp_variant_id.isin(self.variants)]
nvs = self.variants
ngs = self.get_goals()
# variants * goals is the number of variant x goals combinations we expect in the data
lnvs = len(nvs)
lngs = len(ngs)
# create zero data frame for all variants and goals
empty_df = pd.DataFrame(
{
"exp_id": self.id,
"exp_variant_id": np.tile(nvs, lngs),
"unit_type": np.repeat([g.unit_type for g in ngs], lnvs),
"agg_type": np.repeat([g.agg_type for g in ngs], lnvs),
"goal": np.repeat([g.goal for g in ngs], lnvs),
"count": 0,
"sum_sqr_count": 0,
"sum_value": 0,
"sum_sqr_value": 0,
"count_unique": 0,
}
)
for dimension in self.get_dimension_columns():
empty_df[dimension] = np.repeat([g.dimension_to_value.get(dimension, "") for g in ngs], lnvs)
# join to existing data and use zeros for only missing variants and goals
m = (
pd.concat([g, empty_df], axis=0)
.fillna({d: "" for d in self.get_dimension_columns()})
.groupby(
[
"exp_id",
"exp_variant_id",
"unit_type",
"agg_type",
"goal",
]
+ self.get_dimension_columns(),
# dropna=False,
)
.sum()
.reset_index()
)
return m
def _fix_missing_by_unit(self, goals: pd.DataFrame) -> pd.DataFrame:
"""
Adds zero values for missing goals and variants that are needed for metric evaluation.
Does that in the best effort - fills in `count`, `sum_value` with zeros.
"""
# what variants and goals there should be from all the goals needed to evaluate all metrics
self._set_variants(goals)
g = goals[goals.exp_variant_id.isin(self.variants)]
nvs = self.variants
ngs = self.get_goals()
# variants * goals is the number of variant x goals combinations we expect in the data
lnvs = len(nvs)
lngs = len(ngs)
# create zero data frame for all variants and goals
empty_df = pd.DataFrame(
{
"exp_id": self.id,
"exp_variant_id": np.tile(nvs, lngs),
"unit_type": np.repeat([g.unit_type for g in ngs], lnvs),
"agg_type": np.repeat([g.agg_type for g in ngs], lnvs),
"goal": np.repeat([g.goal for g in ngs], lnvs),
"unit_id": np.nan,
"count": 0,
"sum_value": 0,
}
)
for dimension in self.get_dimension_columns():
empty_df[dimension] = np.repeat([g.dimension_to_value.get(dimension, "") for g in ngs], lnvs)
# join to existing data and use zeros for only missing variants and goals
m = pd.concat([g, empty_df], axis=0).fillna({d: "" for d in self.get_dimension_columns()})
return m[
[
"exp_id",
"exp_variant_id",
"unit_type",
"agg_type",
"goal",
"unit_id",
"count",
"sum_value",
]
+ self.get_dimension_columns()
]
def _get_required_sample_size(
self,
metric_row: pd.Series,
controls: dict,
minimum_effects: dict,
metrics_with_value_denominator: set,
n_variants: int,
) -> pd.Series:
metric_id = metric_row["metric_id"]
minimum_effect = minimum_effects[metric_id]
index = ["minimum_effect", "sample_size", "required_sample_size"]
# Right now, metric with value() denominator would return count that is not equal
# to the sample size. In such case we do not evaluate the required sample size.
# TODO: add suport for value() denominator metrics,
# parser will return an additional column equal to count or count_unique.
sample_size = metric_row["count"] if metric_id not in metrics_with_value_denominator else np.nan
if metric_row["exp_variant_id"] == self.control_variant or pd.isnull(minimum_effect):
return pd.Series([np.nan, sample_size, np.nan], index)
metric_id = metric_row["metric_id"]
return pd.Series(
[
minimum_effect,
sample_size,
Statistics.required_sample_size_per_variant(
n_variants=n_variants,
minimum_effect=minimum_effect,
mean=controls[metric_id]["mean"],
std=controls[metric_id]["std"],
std_2=metric_row["std"],
confidence_level=metric_row["confidence_level"],
power=DEFAULT_POWER,
),
],
index,
)
def _get_required_sample_sizes(self, metrics: pd.DataFrame, n_variants: int) -> pd.DataFrame:
controls = {
r["metric_id"]: {"mean": r["mean"], "std": r["std"]}
for _, r in metrics.iterrows()
if r["exp_variant_id"] == self.control_variant
}
minimum_effects = {m.id: m.minimum_effect for m in self.metrics}
metrics_with_value_denominator = {
m.id for m in self.metrics if m.denominator.startswith("value(") and not isinstance(m, SimpleMetric)
}
return metrics.apply(
lambda metric_row: self._get_required_sample_size(
metric_row=metric_row,
controls=controls,
minimum_effects=minimum_effects,
metrics_with_value_denominator=metrics_with_value_denominator,
n_variants=n_variants,
),
axis=1,
)
def _get_power_from_required_sample_sizes(self, metrics: pd.DataFrame, n_variants: int) -> pd.Series:
return metrics.apply(
lambda metric_row: Statistics.power_from_required_sample_size_per_variant(
n_variants=n_variants,
sample_size_per_variant=metric_row["sample_size"],
required_sample_size_per_variant=metric_row["required_sample_size"],
required_confidence_level=metric_row["confidence_level"],
required_power=DEFAULT_POWER,
),
axis=1,
)
def _evaluate_metrics(self, goals: pd.DataFrame, column_fce) -> pd.DataFrame:
if not self.metrics:
return pd.DataFrame([], columns=Evaluation.metric_columns())
sts = []
for m in self.metrics:
count, sum_value, sum_sqr_value = column_fce(m, goals)
sts.append([count, sum_value, sum_sqr_value])
stats = np.array(sts).transpose(0, 2, 1)
metrics = stats.shape[0]
n_variants = stats.shape[1]
count = stats[:, :, 0]
sum_value = stats[:, :, 1]
sum_sqr_value = stats[:, :, 2]
with np.errstate(divide="ignore", invalid="ignore"):
# We fill in zeros, when goal data are missing for some variant.
# There could be division by zero here which is expected as we return
# nan or inf values to the caller.
mean = sum_value / count
std = np.sqrt((sum_sqr_value - sum_value * sum_value / count) / (count - 1))
# sequential testing correction
if self.date_from is not None and self.date_to is not None:
# Parameters
test_length = (self.date_to - self.date_from).days + 1 # test length in days
actual_day = (self.date_for - self.date_from).days + 1 # day(s) since beginning of the test
actual_day = min(actual_day, test_length) # actual day of evaluation must be in interval [1, test_length]
# confidence level adjustment - applied when actual_day < test_length (test is still running)
confidence_level = Statistics.obf_alpha_spending_function(self.confidence_level, test_length, actual_day)
else:
confidence_level = self.confidence_level # no change
stats = np.dstack((count, mean, std, sum_value, np.ones(count.shape) * confidence_level))
stats = np.dstack(
(
np.repeat([m.id for m in self.metrics], n_variants).reshape(metrics, n_variants, -1),
np.repeat([m.name for m in self.metrics], n_variants).reshape(metrics, n_variants, -1),
np.tile(goals["exp_variant_id"].unique(), metrics).reshape(metrics, n_variants, -1),
stats,
)
)
# dimensions of `stats` array: (metrics, variants, stats)
# elements of `stats` array: metrics_id, exp_variant_id, count, mean, std, sum_value, confidence_level
# hypothesis evaluation (standard way using t-test)
c = Statistics.ttest_evaluation(stats, self.control_variant)
# multiple variants (comparisons) correction - applied when we have multiple treatment variants
if n_variants > 2:
c = Statistics.multiple_comparisons_correction(c, n_variants, metrics, confidence_level)
c["exp_id"] = self.id
c["timestamp"] = round(get_utc_timestamp(datetime.now()).timestamp())
c[["minimum_effect", "sample_size", "required_sample_size"]] = self._get_required_sample_sizes(c, n_variants)
c["power"] = self._get_power_from_required_sample_sizes(c, n_variants)
return c[Evaluation.metric_columns()]
evaluate_agg(self, goals)
¶
Evaluate all metrics and checks in the experiment from already pre-aggregated goals.
This method is usefull when there are too many units in the experiment to evaluate it
using evaluate_by_unit
.
Does best effort to fill in missing goals and variants with zeros.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
goals |
DataFrame |
dataframe with one row per goal and aggregated data in columns |
required |
goals
dataframe columns:
exp_id
- experiment idexp_variant_id
- variantunit_type
- randomization unit typeagg_type
- level of aggregationgoal
- goal name- any number of dimensional columns, e.g. column
product
containing valuesp_1
count
- number of observed goals (e.g. conversions)sum_sqr_count
- summed squared number of observed goals per unit, it is similar tosum_sqr_value
sum_value
- value of observed goalssum_sqr_value
- summed squared value per unit. This is used to calculate sample standard deviation from pre-aggregated data (it is a term \(\sum x^2\) in \(\hat{\sigma}^2 = \frac{\sum x^2 - \frac{(\sum x)^2}{n}}{n-1}\)).count_unique
- number of units with at least 1 observed goal
Returns:
Type | Description |
---|---|
Evaluation |
set of dataframes with evaluation |
Usage:
from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
'test-conversion',
'a',
[Metric(
1,
'Click-through Rate',
'count(test_unit_type.unit.click)',
'count(test_unit_type.global.exposure)'),
],
[SrmCheck(1, 'SRM', 'count(test_unit_type.global.exposure)')],
unit_type='test_unit_type')
# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_agg(experiment.id)
# evaluate experiment
ev = experiment.evaluate_agg(goals)
# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])
# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))
Input data frame example:
exp_id exp_variant_id unit_type agg_type goal product count sum_sqr_count sum_value sum_sqr_value count_unique
test-srm a test_unit_type global exposure 100000 100000 100000 100000 100000
test-srm b test_unit_type global exposure 100100 100100 100100 100100 100100
test-srm a test_unit_type unit conversion 1200 1800 32000 66528 900
test-srm a test_unit_type_2 global conversion product_1 1000 1700 31000 55000 850
Source code in epstats/toolkit/experiment.py
def evaluate_agg(self, goals: pd.DataFrame) -> Evaluation:
"""
Evaluate all metrics and checks in the experiment from already pre-aggregated goals.
This method is usefull when there are too many units in the experiment to evaluate it
using [`evaluate_by_unit`][epstats.toolkit.experiment.Experiment.evaluate_by_unit].
Does best effort to fill in missing goals and variants with zeros.
Arguments:
goals: dataframe with one row per goal and aggregated data in columns
`goals` dataframe columns:
1. `exp_id` - experiment id
1. `exp_variant_id` - variant
1. `unit_type` - randomization unit type
1. `agg_type` - level of aggregation
1. `goal` - goal name
1. any number of dimensional columns, e.g. column `product` containing values `p_1`
1. `count` - number of observed goals (e.g. conversions)
1. `sum_sqr_count` - summed squared number of observed goals per unit, it is similar
to `sum_sqr_value`
1. `sum_value` - value of observed goals
1. `sum_sqr_value` - summed squared value per unit. This is used to calculate
sample standard deviation from pre-aggregated data (it is a term $\sum x^2$
in $\hat{\sigma}^2 = \\frac{\sum x^2 - \\frac{(\sum x)^2}{n}}{n-1}$).
1. `count_unique` - number of units with at least 1 observed goal
Returns:
set of dataframes with evaluation
Usage:
```python
from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
'test-conversion',
'a',
[Metric(
1,
'Click-through Rate',
'count(test_unit_type.unit.click)',
'count(test_unit_type.global.exposure)'),
],
[SrmCheck(1, 'SRM', 'count(test_unit_type.global.exposure)')],
unit_type='test_unit_type')
# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_agg(experiment.id)
# evaluate experiment
ev = experiment.evaluate_agg(goals)
# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])
# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))
```
Input data frame example:
```
exp_id exp_variant_id unit_type agg_type goal product count sum_sqr_count sum_value sum_sqr_value count_unique
test-srm a test_unit_type global exposure 100000 100000 100000 100000 100000
test-srm b test_unit_type global exposure 100100 100100 100100 100100 100100
test-srm a test_unit_type unit conversion 1200 1800 32000 66528 900
test-srm a test_unit_type_2 global conversion product_1 1000 1700 31000 55000 850
```
"""
g = self._fix_missing_agg(goals)
return self._evaluate(
g,
Experiment._metrics_column_fce_agg,
Experiment._checks_fce_agg,
Experiment._exposures_fce_agg,
)
evaluate_by_unit(self, goals)
¶
Evaluate all metrics and checks in the experiment from goals grouped by unit_id
.
This method is usefull when there are not many (<1M) units in the experiment to evaluate it.
If there many units exposed to the experiment, pre-aggregate data and use evaluate_agg
.
Does best effort to fill in missing goals and variants with zeros.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
goals |
DataFrame |
dataframe with one row per goal and aggregated data in columns |
required |
goals
dataframe columns:
exp_id
- experiment idexp_variant_id
- variantunit_type
- randomization unit typeunit_id
- (randomization) unit idagg_type
- level of aggregationgoal
- goal name- any number of dimensional columns, e.g. column
product
containing valuesp_1
count
- number of observed goalssum_value
- value of observed goals
Returns:
Type | Description |
---|---|
Evaluation |
set of dataframes with evaluation |
Usage:
from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
'test-real-valued',
'a',
[Metric(
2,
'Average Bookings',
'value(test_unit_type.unit.conversion)',
'count(test_unit_type.unit.exposure)')
],
[],
unit_type='test_unit_type')
# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_by_unit(experiment.id)
# evaluate experiment
ev = experiment.evaluate_by_unit(goals)
# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])
# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))
Input data frame example:
exp_id exp_variant_id unit_type unit_id agg_type goal product count sum_value
test-srm a test_unit_type test_unit_type_1 unit exposure 1 1
test-srm a test_unit_type test_unit_type_1 unit conversion product_1 2 75
test-srm b test_unit_type test_unit_type_2 unit exposure 1 1
test-srm b test_unit_type test_unit_type_3 unit exposure 1 1
test-srm b test_unit_type test_unit_type_3 unit conversion product_2 1 1
Source code in epstats/toolkit/experiment.py
def evaluate_by_unit(self, goals: pd.DataFrame) -> Evaluation:
"""
Evaluate all metrics and checks in the experiment from goals grouped by `unit_id`.
This method is usefull when there are not many (<1M) units in the experiment to evaluate it.
If there many units exposed to the experiment, pre-aggregate data and use [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].
Does best effort to fill in missing goals and variants with zeros.
Arguments:
goals: dataframe with one row per goal and aggregated data in columns
`goals` dataframe columns:
1. `exp_id` - experiment id
1. `exp_variant_id` - variant
1. `unit_type` - randomization unit type
1. `unit_id` - (randomization) unit id
1. `agg_type` - level of aggregation
1. `goal` - goal name
1. any number of dimensional columns, e.g. column `product` containing values `p_1`
1. `count` - number of observed goals
1. `sum_value` - value of observed goals
Returns:
set of dataframes with evaluation
Usage:
```python
from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
'test-real-valued',
'a',
[Metric(
2,
'Average Bookings',
'value(test_unit_type.unit.conversion)',
'count(test_unit_type.unit.exposure)')
],
[],
unit_type='test_unit_type')
# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_by_unit(experiment.id)
# evaluate experiment
ev = experiment.evaluate_by_unit(goals)
# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])
# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))
```
Input data frame example:
```
exp_id exp_variant_id unit_type unit_id agg_type goal product count sum_value
test-srm a test_unit_type test_unit_type_1 unit exposure 1 1
test-srm a test_unit_type test_unit_type_1 unit conversion product_1 2 75
test-srm b test_unit_type test_unit_type_2 unit exposure 1 1
test-srm b test_unit_type test_unit_type_3 unit exposure 1 1
test-srm b test_unit_type test_unit_type_3 unit conversion product_2 1 1
```
"""
g = self._fix_missing_by_unit(goals)
# We need to pivot table to get all goals per `unit_id` on the same row in the data frame.
# This is needed to be able to vector-evaluate compound metrics
# eg. `value(test_unit_type.unit.conversion) - value(test_unit_type.unit.refund)`
g = (
pd.pivot_table(
g,
values=["count", "sum_value"],
index=[
"exp_id",
"exp_variant_id",
"unit_type",
"agg_type",
"unit_id",
]
+ self.get_dimension_columns(),
columns="goal",
aggfunc=np.sum,
fill_value=0,
)
.swaplevel(axis=1)
.reset_index()
)
return self._evaluate(
g,
Experiment._metrics_column_fce_by_unit,
Experiment._checks_fce_by_unit,
Experiment._exposures_fce_by_unit,
)
evaluate_wide_agg(self, goals)
¶
This is a simplified version of the method evaluate_agg
.
It consumes simple input goals
dataframe, transfers it into suitable dataframe format and evaluate it using general method evaluate_agg
.
It assumes that the first two columns are name of the experiment and variants. Than follows columns with data.
See usage of the method in the notebook Ad-hoc A/B test evaluation using Ep-Stats.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
goals |
DataFrame |
dataframe with one row per variant and aggregated data in columns |
required |
Possible goals
dataframe columns (check the input dataframe example):
exp_id
- experiment idexp_variant_id
- variantclicks
- sum of clicksviews
- sum of viewsbookings
- sum of bookingsbookings_squared
- sum of bookings squared
Returns:
Type | Description |
---|---|
Evaluation |
set of dataframes with evaluation |
Usage:
from epstats.toolkit import Experiment, SimpleMetric, SimpleSrmCheck
from epstats.toolkit.results import results_long_to_wide, format_results
from epstats.toolkit.testing import TestData
# Load Test Data
goals = TestData.load_goals_simple_agg()
# Define the experiment
unit_type = 'test_unit_type'
experiment = Experiment(
'test-simple-metric',
'a',
[
SimpleMetric(1, 'Click-through Rate (CTR)', 'clicks', 'views', unit_type),
SimpleMetric(2, 'Conversion Rate', 'conversions', 'views', unit_type),
SimpleMetric(3, 'Revenue per Mille (RPM)', 'bookings', 'views', unit_type, metric_format='${:,.2f}', metric_value_multiplier=1000),
],
[SimpleSrmCheck(1, 'SRM', 'views')],
unit_type=unit_type)
# Evaluate the experiment
ev = experiment.evaluate_wide_agg(goals)
# Work with results
print(ev.exposures)
print(ev.metrics)
print(ev.checks)
# Possible formatting of metrics
ev.metrics.pipe(results_long_to_wide).pipe(format_results, experiment, format_pct='{:.1%}', format_pval='{:.3f}')
Input dataframe example:
experiment_id variant_id views clicks conversions bookings bookings_squared
my-exp a 473661 48194 413 17152 803105
my-exp b 471485 47184 360 14503 677178
my-exp c 477159 48841 406 15892 711661
my-exp d 474934 49090 289 11995 566700
Source code in epstats/toolkit/experiment.py
def evaluate_wide_agg(self, goals: pd.DataFrame) -> Evaluation:
"""
This is a simplified version of the method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].
It consumes simple input `goals` dataframe, transfers it into suitable dataframe format and evaluate it using general method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].
It assumes that the first two columns are name of the experiment and variants. Than follows columns with data.
See usage of the method in the notebook [Ad-hoc A/B test evaluation using Ep-Stats](../user_guide/ab_test_simple_evaluation.html).
Arguments:
goals: dataframe with one row per variant and aggregated data in columns
Possible `goals` dataframe columns (check the input dataframe example):
1. `exp_id` - experiment id
1. `exp_variant_id` - variant
1. `clicks` - sum of clicks
1. `views` - sum of views
1. `bookings` - sum of bookings
1. `bookings_squared` - sum of bookings squared
Returns:
set of dataframes with evaluation
Usage:
```python
from epstats.toolkit import Experiment, SimpleMetric, SimpleSrmCheck
from epstats.toolkit.results import results_long_to_wide, format_results
from epstats.toolkit.testing import TestData
# Load Test Data
goals = TestData.load_goals_simple_agg()
# Define the experiment
unit_type = 'test_unit_type'
experiment = Experiment(
'test-simple-metric',
'a',
[
SimpleMetric(1, 'Click-through Rate (CTR)', 'clicks', 'views', unit_type),
SimpleMetric(2, 'Conversion Rate', 'conversions', 'views', unit_type),
SimpleMetric(3, 'Revenue per Mille (RPM)', 'bookings', 'views', unit_type, metric_format='${:,.2f}', metric_value_multiplier=1000),
],
[SimpleSrmCheck(1, 'SRM', 'views')],
unit_type=unit_type)
# Evaluate the experiment
ev = experiment.evaluate_wide_agg(goals)
# Work with results
print(ev.exposures)
print(ev.metrics)
print(ev.checks)
# Possible formatting of metrics
ev.metrics.pipe(results_long_to_wide).pipe(format_results, experiment, format_pct='{:.1%}', format_pval='{:.3f}')
```
Input dataframe example:
```
experiment_id variant_id views clicks conversions bookings bookings_squared
my-exp a 473661 48194 413 17152 803105
my-exp b 471485 47184 360 14503 677178
my-exp c 477159 48841 406 15892 711661
my-exp d 474934 49090 289 11995 566700
```
"""
g = goals_wide_to_long(goals, self.unit_type)
return self.evaluate_agg(g)
get_dimension_columns(self)
¶
Returns a list of all dimensions used in all metrics in the experiment.
Source code in epstats/toolkit/experiment.py
def get_dimension_columns(self) -> List[str]:
"""
Returns a list of all dimensions used in all metrics in the experiment.
"""
return list({d for g in self.get_goals() for d in g.dimension_to_value.keys()})
get_goals(self)
¶
List of all goals needed to evaluate all metrics and checks in the experiment.
Returns:
Type | Description |
---|---|
List[epstats.toolkit.parser.EpGoal] |
list of parsed structured goals |
Source code in epstats/toolkit/experiment.py
def get_goals(self) -> List[EpGoal]:
"""
List of all goals needed to evaluate all metrics and checks in the experiment.
Returns:
list of parsed structured goals
"""
res = set()
for m in self.metrics:
res = res.union(m.get_goals())
for c in self.checks:
res = res.union(c.get_goals())
res = res.union(self._exposure_goals)
return list(res)