Skip to content

Experiment

Evaluate one experiment described as a list of metrics and checks.

See Statistics for details about statistical method used and Evaluation for description of output.

Source code in src/epstats/toolkit/experiment.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
class Experiment:
    """
    Evaluate one experiment described as a list of metrics and checks.

    See [Statistics](../stats/basics.md) for details about statistical method used
    and [`Evaluation`][epstats.toolkit.experiment.Evaluation] for description of output.
    """

    def __init__(
        self,
        id: str,
        control_variant: str,
        metrics: List[Metric],
        checks: List[Check],
        unit_type: str,
        date_from: Optional[str] = None,
        date_to: Optional[str] = None,
        date_for: Optional[str] = None,
        confidence_level: float = DEFAULT_CONFIDENCE_LEVEL,
        variants: Optional[List[str]] = None,
        filters: Optional[List[Filter]] = None,
        null_hypothesis_rate: Optional[float] = None,
        query_parameters: dict = {},
    ):
        self._logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        self.id = id
        self.control_variant = control_variant
        self.unit_type = unit_type
        self.metrics = metrics
        self._check_metric_ids_unique()
        self.checks = checks
        self.date_from = datetime.strptime(date_from, "%Y-%m-%d").date() if date_from is not None else None
        self.date_to = datetime.strptime(date_to, "%Y-%m-%d").date() if date_to is not None else None
        self.date_for = (
            datetime.strptime(date_for, "%Y-%m-%d").date() if date_for is not None else datetime.today().date()
        )
        self.confidence_level = confidence_level
        self.variants = variants
        self._exposure_goals = [
            EpGoal(
                [
                    "count",
                    "(",
                    UnitType([unit_type]),
                    ".",
                    AggType(["global"]),
                    ".",
                    Goal(["exposure"]),
                    ")",
                ]
            )
        ]
        self._update_dimension_to_value()
        self.filters = filters if filters is not None else []
        self.query_parameters = query_parameters
        self.null_hypothesis_rate = null_hypothesis_rate

    def _check_metric_ids_unique(self):
        """
        Raises an exception if `metrics` contain duplicated ids.
        """
        id_counts = Counter(metric.id for metric in self.metrics)
        for id_, count in id_counts.items():
            if count > 1:
                raise ValueError(f"Metric ids must be unique. Id={id_} found more than once.")

    def _update_dimension_to_value(self):
        """
        To every `EpGoal` across all metrics, we need to add missing dimensions
        that are present in every other `EpGoal` instances so the row masking
        can work properly.
        """

        all_goals = []
        for metric_or_check in self.metrics + self.checks:
            for attr in metric_or_check.__dict__.values():
                if isinstance(attr, Parser):
                    all_goals.append(attr._nominator_expr.get_goals())
                    all_goals.append(attr._denominator_expr.get_goals())

        all_goals = chain(*all_goals, self._exposure_goals)

        for goal in all_goals:
            for dimension in self.get_dimension_columns():
                if dimension not in goal.dimension_to_value:
                    goal.dimension_to_value[dimension] = ""

    def evaluate_agg(self, goals: pd.DataFrame) -> Evaluation:
        """
        Evaluate all metrics and checks in the experiment from already pre-aggregated goals.

        This method is usefull when there are too many units in the experiment to evaluate it
        using [`evaluate_by_unit`][epstats.toolkit.experiment.Experiment.evaluate_by_unit].

        Does best effort to fill in missing goals and variants with zeros.

        Arguments:
            goals: dataframe with one row per goal and aggregated data in columns

        `goals` dataframe columns:

        1. `exp_id` - experiment id
        1. `exp_variant_id` - variant
        1. `unit_type` - randomization unit type
        1. `agg_type` - level of aggregation
        1. `goal` - goal name
        1. any number of dimensional columns, e.g. column `product` containing values `p_1`
        1. `count` - number of observed goals (e.g. conversions)
        1. `sum_sqr_count` - summed squared number of observed goals per unit, it is similar
        to `sum_sqr_value`
        1. `sum_value` - value of observed goals
        1. `sum_sqr_value` - summed squared value per unit.  This is used to calculate
        sample standard deviation from pre-aggregated data (it is a term $\\sum x^2$
        in $\\hat{\\sigma}^2 = \\frac{\\sum x^2 - \\frac{(\\sum x)^2}{n}}{n-1}$).
        1. `count_unique` - number of units with at least 1 observed goal

        Returns:
            set of dataframes with evaluation

        Usage:

        ```python
        from epstats.toolkit import Experiment, Metric, SrmCheck
        experiment = Experiment(
            'test-conversion',
            'a',
            [Metric(
                1,
                'Click-through Rate',
                'count(test_unit_type.unit.click)',
                'count(test_unit_type.global.exposure)'),
            ],
            [SrmCheck(1, 'SRM', 'count(test_unit_type.global.exposure)')],
            unit_type='test_unit_type')

        # This gets testing data, use other Dao or get aggregated goals in some other way.
        from epstats.toolkit.testing import TestData
        goals = TestData.load_goals_agg(experiment.id)

        # evaluate experiment
        ev = experiment.evaluate_agg(goals)

        # work with results
        print(ev.exposures)
        print(ev.metrics[ev.metrics == 1])
        print(ev.checks[ev.checks == 1])

        # this is to assert that this code sample works correctly
        from epstats.toolkit.testing import TestDao
        assert_experiment(experiment, ev, TestDao(TestData()))
        ```

        Input data frame example:

        ```
        exp_id      exp_variant_id  unit_type           agg_type    goal            product             count   sum_sqr_count   sum_value   sum_sqr_value   count_unique
        test-srm    a               test_unit_type      global      exposure                            100000  100000          100000      100000          100000
        test-srm    b               test_unit_type      global      exposure                            100100  100100          100100      100100          100100
        test-srm    a               test_unit_type      unit        conversion                          1200    1800            32000       66528           900
        test-srm    a               test_unit_type_2    global      conversion      product_1           1000    1700            31000       55000           850
        ```
        """
        g = self._fix_missing_agg(goals)
        return self._evaluate(
            g,
            Experiment._metrics_column_fce_agg,
            Experiment._checks_fce_agg,
            Experiment._exposures_fce_agg,
        )

    def evaluate_wide_agg(self, goals: pd.DataFrame) -> Evaluation:
        """
        This is a simplified version of the method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].

        It consumes simple input `goals` dataframe, transfers it into suitable dataframe format and evaluate it using general method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].

        It assumes that the first two columns are name of the experiment and variants. Than follows columns with data.

        See usage of the method in the notebook [Ad-hoc A/B test evaluation using Ep-Stats][ad-hoc-ab-test-evaluation-using-ep-stats].

        Arguments:
            goals: dataframe with one row per variant and aggregated data in columns

        Possible `goals` dataframe columns (check the input dataframe example):

        1. `exp_id` - experiment id
        1. `exp_variant_id` - variant
        1. `clicks` - sum of clicks
        1. `views` - sum of views
        1. `bookings` - sum of bookings
        1. `bookings_squared` - sum of bookings squared

        Returns:
            set of dataframes with evaluation

        Usage:

        ```python
        from epstats.toolkit import Experiment, SimpleMetric, SimpleSrmCheck
        from epstats.toolkit.results import results_long_to_wide, format_results
        from epstats.toolkit.testing import TestData

        # Load Test Data
        goals = TestData.load_goals_simple_agg()

        # Define the experiment
        unit_type = 'test_unit_type'
        experiment = Experiment(
            'test-simple-metric',
            'a',
            [
                SimpleMetric(1, 'Click-through Rate (CTR)', 'clicks', 'views', unit_type),
                SimpleMetric(2, 'Conversion Rate', 'conversions', 'views', unit_type),
                SimpleMetric(3, 'Revenue per Mille (RPM)', 'bookings', 'views', unit_type, metric_format='${:,.2f}', metric_value_multiplier=1000),
            ],
            [SimpleSrmCheck(1, 'SRM', 'views')],
            unit_type=unit_type)

        # Evaluate the experiment
        ev = experiment.evaluate_wide_agg(goals)

        # Work with results
        print(ev.exposures)
        print(ev.metrics)
        print(ev.checks)

        # Possible formatting of metrics
        ev.metrics.pipe(results_long_to_wide).pipe(format_results, experiment, format_pct='{:.1%}', format_pval='{:.3f}')
        ```

        Input dataframe example:
        ```
        experiment_id   variant_id  views   clicks  conversions     bookings    bookings_squared
        my-exp          a           473661  48194   413             17152       803105
        my-exp          b           471485  47184   360             14503       677178
        my-exp          c           477159  48841   406             15892       711661
        my-exp          d           474934  49090   289             11995       566700
        ```
        """
        g = goals_wide_to_long(goals, self.unit_type)
        return self.evaluate_agg(g)

    def evaluate_by_unit(self, goals: pd.DataFrame) -> Evaluation:
        """
        Evaluate all metrics and checks in the experiment from goals grouped by `unit_id`.

        This method is useful when there are not many (<1M) units in the experiment to evaluate it.
        If there many units exposed to the experiment, pre-aggregate data and use [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].

        Does best effort to fill in missing goals and variants with zeros.

        Arguments:
            goals: dataframe with one row per goal and aggregated data in columns

        `goals` dataframe columns:

        1. `exp_id` - experiment id
        1. `exp_variant_id` - variant
        1. `unit_type` - randomization unit type
        1. `unit_id` - (randomization) unit id
        1. `agg_type` - level of aggregation
        1. `goal` - goal name
        1. any number of dimensional columns, e.g. column `product` containing values `p_1`
        1. `count` - number of observed goals
        1. `sum_value` - value of observed goals

        Returns:
            set of dataframes with evaluation

        Usage:

        ```python
        from epstats.toolkit import Experiment, Metric, SrmCheck
        experiment = Experiment(
            'test-real-valued',
            'a',
            [Metric(
                2,
                'Average Bookings',
                'value(test_unit_type.unit.conversion)',
                'count(test_unit_type.unit.exposure)')
            ],
            [],
            unit_type='test_unit_type')

        # This gets testing data, use other Dao or get aggregated goals in some other way.
        from epstats.toolkit.testing import TestData
        goals = TestData.load_goals_by_unit(experiment.id)

        # evaluate experiment
        ev = experiment.evaluate_by_unit(goals)

        # work with results
        print(ev.exposures)
        print(ev.metrics[ev.metrics == 1])
        print(ev.checks[ev.checks == 1])

        # this is to assert that this code sample works correctly
        from epstats.toolkit.testing import TestDao
        assert_experiment(experiment, ev, TestDao(TestData()))
        ```

        Input data frame example:

        ```
        exp_id      exp_variant_id  unit_type       unit_id             agg_type    goal              product             count   sum_value
        test-srm    a               test_unit_type  test_unit_type_1    unit        exposure                              1       1
        test-srm    a               test_unit_type  test_unit_type_1    unit        conversion        product_1           2       75
        test-srm    b               test_unit_type  test_unit_type_2    unit        exposure                              1       1
        test-srm    b               test_unit_type  test_unit_type_3    unit        exposure                              1       1
        test-srm    b               test_unit_type  test_unit_type_3    unit        conversion        product_2           1       1
        ```
        """
        g = self._fix_missing_by_unit(goals)

        # We need to pivot table to get all goals per `unit_id` on the same row in the data frame.
        # This is needed to be able to vector-evaluate compound metrics
        # eg. `value(test_unit_type.unit.conversion) - value(test_unit_type.unit.refund)`
        g = (
            pd.pivot_table(
                g,
                values=["count", "sum_value"],
                index=[
                    "exp_id",
                    "exp_variant_id",
                    "unit_type",
                    "agg_type",
                    "unit_id",
                ]
                + self.get_dimension_columns(),
                columns="goal",
                aggfunc="sum",
                fill_value=0,
            )
            .swaplevel(axis=1)
            .reset_index()
        )

        return self._evaluate(
            g,
            Experiment._metrics_column_fce_by_unit,
            Experiment._checks_fce_by_unit,
            Experiment._exposures_fce_by_unit,
        )

    def get_goals(self) -> List[EpGoal]:
        """
        List of all goals needed to evaluate all metrics and checks in the experiment.

        Returns:
            list of parsed structured goals
        """
        res = set()
        for m in self.metrics:
            res = res.union(m.get_goals())
        for c in self.checks:
            res = res.union(c.get_goals())
        res = res.union(self._exposure_goals)
        return list(res)

    @staticmethod
    def _metrics_column_fce_agg(m: Metric, goals: pd.DataFrame):
        """
        Gets count, sum_value, sum_sqr_value columns by expression from already aggregated goals.
        """
        return m.get_evaluate_columns_agg(goals)

    @staticmethod
    def _metrics_column_fce_by_unit(m: Metric, goals: pd.DataFrame):
        """
        Gets count, sum_value, sum_sqr_value columns by expression from goals grouped by `unit_id`.
        """
        return m.get_evaluate_columns_by_unit(goals)

    @staticmethod
    def _checks_fce_agg(c: Check, goals: pd.DataFrame, control_variant: str):
        """
        Evaluates checks from already aggregated goals.
        """
        return c.evaluate_agg(goals, control_variant)

    @staticmethod
    def _checks_fce_by_unit(c: Check, goals: pd.DataFrame, control_variant: str):
        """
        Evaluates checks from goals grouped by `unit_id`.
        """
        return c.evaluate_by_unit(goals, control_variant)

    @staticmethod
    def _exposures_fce_agg(goals: pd.DataFrame, exp_id: str, unit_type: str):
        """
        Evaluates checks from already aggregated goals.
        """
        checks_df = (
            goals[(goals["unit_type"] == unit_type) & (goals["agg_type"] == "global") & (goals["goal"] == "exposure")]
            .groupby("exp_variant_id")
            .agg(exposures=("count", "sum"))
            .reset_index()
        )
        checks_df["exp_id"] = exp_id
        return checks_df

    @staticmethod
    def _exposures_fce_by_unit(goals: pd.DataFrame, exp_id: str, unit_type: str):
        """
        Evaluates checks from already aggregated goals.
        """
        checks_df = goals[(goals["unit_type"] == unit_type) & (goals["agg_type"] == "unit")][
            [("exp_variant_id", ""), ("exposure", "count")]
        ]
        checks_df = checks_df.droplevel(0, axis=1)
        checks_df.columns = ["exp_variant_id", "exposures"]
        d = checks_df.groupby("exp_variant_id").agg(exposures=("exposures", "sum")).reset_index()
        d["exp_id"] = exp_id
        return d

    def _evaluate(self, goals: pd.DataFrame, metrics_column_fce, checks_fce, exposures_fce):
        metrics = self._evaluate_metrics(goals, metrics_column_fce)
        checks = self._evaluate_checks(goals, checks_fce)
        exposures = self._evaluate_exposures(goals, exposures_fce)
        return Evaluation(metrics, checks, exposures)

    def _evaluate_exposures(self, goals: pd.DataFrame, exposures_fce) -> pd.DataFrame:
        return exposures_fce(goals, self.id, self.unit_type)

    def _evaluate_checks(self, goals: pd.DataFrame, check_fce) -> pd.DataFrame:
        res = []
        for c in self.checks:
            try:
                r = check_fce(c, goals, self.control_variant)
                r["exp_id"] = self.id
                res.append(r)
            except Exception as e:
                self._logger.warning(f"Cannot evaluate check [{c.id} in experiment [{self.id}] because of {e}")
                check_evaluation_errors_metric.inc()

        c = pd.concat(res, axis=0) if res != [] else pd.DataFrame([], columns=Evaluation.check_columns())
        c["timestamp"] = round(get_utc_timestamp(datetime.now()).timestamp())
        return c[Evaluation.check_columns()]

    def get_dimension_columns(self) -> List[str]:
        """
        Returns a list of all dimensions used in all metrics in the experiment.
        """
        return list({d for g in self.get_goals() for d in g.dimension_to_value.keys()})

    def _set_variants(self, goals):
        # what variants and goals there should be from all the goals needed to evaluate all metrics
        self.variants = (
            self.variants
            if self.variants is not None
            else np.unique(np.append(goals["exp_variant_id"], self.control_variant))
        )

    def _fix_missing_agg(self, goals: pd.DataFrame) -> pd.DataFrame:
        """
        Adds zero values for missing goals and variants that are needed for metric evaluation.

        Does that in the best effort - fills in `count`, `sum_sqr_count`, `sum_value`, `sum_sqr_value` and `count_unique` with zeros.
        """
        # what variants and goals there should be from all the goals needed to evaluate all metrics
        self._set_variants(goals)
        g = goals[goals.exp_variant_id.isin(self.variants)]
        nvs = self.variants
        ngs = self.get_goals()

        # variants * goals is the number of variant x goals combinations we expect in the data
        lnvs = len(nvs)
        lngs = len(ngs)

        # create zero data frame for all variants and goals
        empty_df = pd.DataFrame(
            {
                "exp_id": self.id,
                "exp_variant_id": np.tile(nvs, lngs),
                "unit_type": np.repeat([g.unit_type for g in ngs], lnvs),
                "agg_type": np.repeat([g.agg_type for g in ngs], lnvs),
                "goal": np.repeat([g.goal for g in ngs], lnvs),
                "count": 0,
                "sum_sqr_count": 0,
                "sum_value": 0,
                "sum_sqr_value": 0,
                "count_unique": 0,
            }
        )

        for dimension in self.get_dimension_columns():
            empty_df[dimension] = np.repeat([g.dimension_to_value.get(dimension, "") for g in ngs], lnvs)

        # join to existing data and use zeros for only missing variants and goals
        m = (
            pd.concat([g, empty_df], axis=0)
            .fillna({d: "" for d in self.get_dimension_columns()})
            .groupby(
                [
                    "exp_id",
                    "exp_variant_id",
                    "unit_type",
                    "agg_type",
                    "goal",
                ]
                + self.get_dimension_columns(),
                # dropna=False,
            )
            .sum()
            .reset_index()
        )
        return m

    def _fix_missing_by_unit(self, goals: pd.DataFrame) -> pd.DataFrame:
        """
        Adds zero values for missing goals and variants that are needed for metric evaluation.

        Does that in the best effort - fills in `count`, `sum_value` with zeros.
        """
        # what variants and goals there should be from all the goals needed to evaluate all metrics
        self._set_variants(goals)
        g = goals[goals.exp_variant_id.isin(self.variants)]
        nvs = self.variants
        ngs = self.get_goals()

        # variants * goals is the number of variant x goals combinations we expect in the data
        lnvs = len(nvs)
        lngs = len(ngs)

        # create zero data frame for all variants and goals
        empty_df = pd.DataFrame(
            {
                "exp_id": self.id,
                "exp_variant_id": np.tile(nvs, lngs),
                "unit_type": np.repeat([g.unit_type for g in ngs], lnvs),
                "agg_type": np.repeat([g.agg_type for g in ngs], lnvs),
                "goal": np.repeat([g.goal for g in ngs], lnvs),
                "unit_id": np.nan,
                "count": 0,
                "sum_value": 0,
            }
        )

        for dimension in self.get_dimension_columns():
            empty_df[dimension] = np.repeat([g.dimension_to_value.get(dimension, "") for g in ngs], lnvs)

        # join to existing data and use zeros for only missing variants and goals
        m = pd.concat([g, empty_df], axis=0).fillna({d: "" for d in self.get_dimension_columns()})
        return m[
            [
                "exp_id",
                "exp_variant_id",
                "unit_type",
                "agg_type",
                "goal",
                "unit_id",
                "count",
                "sum_value",
            ]
            + self.get_dimension_columns()
        ]

    def _get_required_sample_size(
        self,
        metric_row: pd.Series,
        controls: dict,
        minimum_effects: dict,
        metrics_with_value_denominator: set,
        n_variants: int,
    ) -> pd.Series:
        metric_id = metric_row["metric_id"]
        minimum_effect = minimum_effects[metric_id]
        index = ["minimum_effect", "sample_size", "required_sample_size"]

        # Right now, metric with value() denominator would return count that is not equal
        # to the sample size. In such case we do not evaluate the required sample size.
        # TODO: add suport for value() denominator metrics,
        # parser will return an additional column equal to count or count_unique.
        sample_size = metric_row["count"] if metric_id not in metrics_with_value_denominator else np.nan

        if metric_row["exp_variant_id"] == self.control_variant or pd.isna(minimum_effect):
            return pd.Series([np.nan, sample_size, np.nan], index)

        metric_id = metric_row["metric_id"]
        return pd.Series(
            [
                minimum_effect,
                sample_size,
                Statistics.required_sample_size_per_variant(
                    n_variants=n_variants,
                    minimum_effect=minimum_effect,
                    mean=controls[metric_id]["mean"],
                    std=controls[metric_id]["std"],
                    std_2=metric_row["std"],
                    confidence_level=metric_row["confidence_level"],
                    power=DEFAULT_POWER,
                ),
            ],
            index,
        )

    def _get_required_sample_sizes(self, metrics: pd.DataFrame, n_variants: int) -> pd.DataFrame:
        controls = {
            r["metric_id"]: {"mean": r["mean"], "std": r["std"]}
            for _, r in metrics.iterrows()
            if r["exp_variant_id"] == self.control_variant
        }

        minimum_effects = {m.id: m.minimum_effect for m in self.metrics}
        metrics_with_value_denominator = {
            m.id for m in self.metrics if m.denominator.startswith("value(") and not isinstance(m, SimpleMetric)
        }

        return metrics.apply(
            lambda metric_row: self._get_required_sample_size(
                metric_row=metric_row,
                controls=controls,
                minimum_effects=minimum_effects,
                metrics_with_value_denominator=metrics_with_value_denominator,
                n_variants=n_variants,
            ),
            axis=1,
        )

    def _get_power_from_required_sample_sizes(self, metrics: pd.DataFrame, n_variants: int) -> pd.Series:
        return metrics.apply(
            lambda metric_row: Statistics.power_from_required_sample_size_per_variant(
                n_variants=n_variants,
                sample_size_per_variant=metric_row["sample_size"],
                required_sample_size_per_variant=metric_row["required_sample_size"],
                required_confidence_level=metric_row["confidence_level"],
                required_power=DEFAULT_POWER,
            ),
            axis=1,
        )

    def _get_false_positive_risk(self, metric_row: pd.Series) -> float:
        if self.null_hypothesis_rate is None:
            return np.nan

        if metric_row["p_value"] >= (1 - metric_row["confidence_level"]):
            return np.nan

        return Statistics.false_positive_risk(
            null_hypothesis_rate=self.null_hypothesis_rate,
            power=metric_row["power"],
            p_value=metric_row["p_value"],
        )

    def _get_false_positive_risks(self, metrics: pd.DataFrame) -> pd.Series:
        return metrics.apply(self._get_false_positive_risk, axis=1)

    def _evaluate_metrics(self, goals: pd.DataFrame, column_fce) -> pd.DataFrame:
        if not self.metrics:
            return pd.DataFrame([], columns=Evaluation.metric_columns())

        sts = []
        for m in self.metrics:
            count, sum_value, sum_sqr_value = column_fce(m, goals)
            sts.append([count, sum_value, sum_sqr_value])
        stats = np.array(sts).transpose(0, 2, 1)
        metrics = stats.shape[0]
        n_variants = stats.shape[1]

        count = stats[:, :, 0]
        sum_value = stats[:, :, 1]
        sum_sqr_value = stats[:, :, 2]
        with np.errstate(divide="ignore", invalid="ignore"):
            # We fill in zeros, when goal data are missing for some variant.
            # There could be division by zero here which is expected as we return
            # nan or inf values to the caller.
            mean = sum_value / count
            std = np.sqrt((sum_sqr_value - sum_value * sum_value / count) / (count - 1))

        # sequential testing correction
        if self.date_from is not None and self.date_to is not None:
            # Parameters
            test_length = (self.date_to - self.date_from).days + 1  # test length in days
            actual_day = (self.date_for - self.date_from).days + 1  # day(s) since beginning of the test
            actual_day = min(actual_day, test_length)  # actual day of evaluation must be in interval [1, test_length]

            # confidence level adjustment - applied when actual_day < test_length (test is still running)
            confidence_level = Statistics.obf_alpha_spending_function(self.confidence_level, test_length, actual_day)
        else:
            confidence_level = self.confidence_level  # no change

        stats = np.dstack((count, mean, std, sum_value, np.ones(count.shape) * confidence_level))
        stats = np.dstack(
            (
                np.repeat([m.id for m in self.metrics], n_variants).reshape(metrics, n_variants, -1),
                np.repeat([m.name for m in self.metrics], n_variants).reshape(metrics, n_variants, -1),
                np.tile(goals["exp_variant_id"].unique(), metrics).reshape(metrics, n_variants, -1),
                stats,
            )
        )

        # dimensions of `stats` array: (metrics, variants, stats)
        # elements of `stats` array: metrics_id, exp_variant_id, count, mean, std, sum_value, confidence_level
        # hypothesis evaluation (standard way using t-test)
        c = Statistics.ttest_evaluation(stats, self.control_variant)

        # multiple variants (comparisons) correction - applied when we have multiple treatment variants
        if n_variants > 2:
            c = Statistics.multiple_comparisons_correction(c, n_variants, metrics, confidence_level)

        c["exp_id"] = self.id
        c["timestamp"] = round(get_utc_timestamp(datetime.now()).timestamp())
        c[["minimum_effect", "sample_size", "required_sample_size"]] = self._get_required_sample_sizes(c, n_variants)
        c["power"] = self._get_power_from_required_sample_sizes(c, n_variants)
        c["false_positive_risk"] = self._get_false_positive_risks(c)
        return c[Evaluation.metric_columns()]

evaluate_agg(goals)

Evaluate all metrics and checks in the experiment from already pre-aggregated goals.

This method is usefull when there are too many units in the experiment to evaluate it using evaluate_by_unit.

Does best effort to fill in missing goals and variants with zeros.

Parameters:

Name Type Description Default
goals DataFrame

dataframe with one row per goal and aggregated data in columns

required

goals dataframe columns:

  1. exp_id - experiment id
  2. exp_variant_id - variant
  3. unit_type - randomization unit type
  4. agg_type - level of aggregation
  5. goal - goal name
  6. any number of dimensional columns, e.g. column product containing values p_1
  7. count - number of observed goals (e.g. conversions)
  8. sum_sqr_count - summed squared number of observed goals per unit, it is similar to sum_sqr_value
  9. sum_value - value of observed goals
  10. sum_sqr_value - summed squared value per unit. This is used to calculate sample standard deviation from pre-aggregated data (it is a term \(\sum x^2\) in \(\hat{\sigma}^2 = \frac{\sum x^2 - \frac{(\sum x)^2}{n}}{n-1}\)).
  11. count_unique - number of units with at least 1 observed goal

Returns:

Type Description
Evaluation

set of dataframes with evaluation

Usage:

from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
    'test-conversion',
    'a',
    [Metric(
        1,
        'Click-through Rate',
        'count(test_unit_type.unit.click)',
        'count(test_unit_type.global.exposure)'),
    ],
    [SrmCheck(1, 'SRM', 'count(test_unit_type.global.exposure)')],
    unit_type='test_unit_type')

# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_agg(experiment.id)

# evaluate experiment
ev = experiment.evaluate_agg(goals)

# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])

# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))

Input data frame example:

exp_id      exp_variant_id  unit_type           agg_type    goal            product             count   sum_sqr_count   sum_value   sum_sqr_value   count_unique
test-srm    a               test_unit_type      global      exposure                            100000  100000          100000      100000          100000
test-srm    b               test_unit_type      global      exposure                            100100  100100          100100      100100          100100
test-srm    a               test_unit_type      unit        conversion                          1200    1800            32000       66528           900
test-srm    a               test_unit_type_2    global      conversion      product_1           1000    1700            31000       55000           850
Source code in src/epstats/toolkit/experiment.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def evaluate_agg(self, goals: pd.DataFrame) -> Evaluation:
    """
    Evaluate all metrics and checks in the experiment from already pre-aggregated goals.

    This method is usefull when there are too many units in the experiment to evaluate it
    using [`evaluate_by_unit`][epstats.toolkit.experiment.Experiment.evaluate_by_unit].

    Does best effort to fill in missing goals and variants with zeros.

    Arguments:
        goals: dataframe with one row per goal and aggregated data in columns

    `goals` dataframe columns:

    1. `exp_id` - experiment id
    1. `exp_variant_id` - variant
    1. `unit_type` - randomization unit type
    1. `agg_type` - level of aggregation
    1. `goal` - goal name
    1. any number of dimensional columns, e.g. column `product` containing values `p_1`
    1. `count` - number of observed goals (e.g. conversions)
    1. `sum_sqr_count` - summed squared number of observed goals per unit, it is similar
    to `sum_sqr_value`
    1. `sum_value` - value of observed goals
    1. `sum_sqr_value` - summed squared value per unit.  This is used to calculate
    sample standard deviation from pre-aggregated data (it is a term $\\sum x^2$
    in $\\hat{\\sigma}^2 = \\frac{\\sum x^2 - \\frac{(\\sum x)^2}{n}}{n-1}$).
    1. `count_unique` - number of units with at least 1 observed goal

    Returns:
        set of dataframes with evaluation

    Usage:

    ```python
    from epstats.toolkit import Experiment, Metric, SrmCheck
    experiment = Experiment(
        'test-conversion',
        'a',
        [Metric(
            1,
            'Click-through Rate',
            'count(test_unit_type.unit.click)',
            'count(test_unit_type.global.exposure)'),
        ],
        [SrmCheck(1, 'SRM', 'count(test_unit_type.global.exposure)')],
        unit_type='test_unit_type')

    # This gets testing data, use other Dao or get aggregated goals in some other way.
    from epstats.toolkit.testing import TestData
    goals = TestData.load_goals_agg(experiment.id)

    # evaluate experiment
    ev = experiment.evaluate_agg(goals)

    # work with results
    print(ev.exposures)
    print(ev.metrics[ev.metrics == 1])
    print(ev.checks[ev.checks == 1])

    # this is to assert that this code sample works correctly
    from epstats.toolkit.testing import TestDao
    assert_experiment(experiment, ev, TestDao(TestData()))
    ```

    Input data frame example:

    ```
    exp_id      exp_variant_id  unit_type           agg_type    goal            product             count   sum_sqr_count   sum_value   sum_sqr_value   count_unique
    test-srm    a               test_unit_type      global      exposure                            100000  100000          100000      100000          100000
    test-srm    b               test_unit_type      global      exposure                            100100  100100          100100      100100          100100
    test-srm    a               test_unit_type      unit        conversion                          1200    1800            32000       66528           900
    test-srm    a               test_unit_type_2    global      conversion      product_1           1000    1700            31000       55000           850
    ```
    """
    g = self._fix_missing_agg(goals)
    return self._evaluate(
        g,
        Experiment._metrics_column_fce_agg,
        Experiment._checks_fce_agg,
        Experiment._exposures_fce_agg,
    )

evaluate_by_unit(goals)

Evaluate all metrics and checks in the experiment from goals grouped by unit_id.

This method is useful when there are not many (<1M) units in the experiment to evaluate it. If there many units exposed to the experiment, pre-aggregate data and use evaluate_agg.

Does best effort to fill in missing goals and variants with zeros.

Parameters:

Name Type Description Default
goals DataFrame

dataframe with one row per goal and aggregated data in columns

required

goals dataframe columns:

  1. exp_id - experiment id
  2. exp_variant_id - variant
  3. unit_type - randomization unit type
  4. unit_id - (randomization) unit id
  5. agg_type - level of aggregation
  6. goal - goal name
  7. any number of dimensional columns, e.g. column product containing values p_1
  8. count - number of observed goals
  9. sum_value - value of observed goals

Returns:

Type Description
Evaluation

set of dataframes with evaluation

Usage:

from epstats.toolkit import Experiment, Metric, SrmCheck
experiment = Experiment(
    'test-real-valued',
    'a',
    [Metric(
        2,
        'Average Bookings',
        'value(test_unit_type.unit.conversion)',
        'count(test_unit_type.unit.exposure)')
    ],
    [],
    unit_type='test_unit_type')

# This gets testing data, use other Dao or get aggregated goals in some other way.
from epstats.toolkit.testing import TestData
goals = TestData.load_goals_by_unit(experiment.id)

# evaluate experiment
ev = experiment.evaluate_by_unit(goals)

# work with results
print(ev.exposures)
print(ev.metrics[ev.metrics == 1])
print(ev.checks[ev.checks == 1])

# this is to assert that this code sample works correctly
from epstats.toolkit.testing import TestDao
assert_experiment(experiment, ev, TestDao(TestData()))

Input data frame example:

exp_id      exp_variant_id  unit_type       unit_id             agg_type    goal              product             count   sum_value
test-srm    a               test_unit_type  test_unit_type_1    unit        exposure                              1       1
test-srm    a               test_unit_type  test_unit_type_1    unit        conversion        product_1           2       75
test-srm    b               test_unit_type  test_unit_type_2    unit        exposure                              1       1
test-srm    b               test_unit_type  test_unit_type_3    unit        exposure                              1       1
test-srm    b               test_unit_type  test_unit_type_3    unit        conversion        product_2           1       1
Source code in src/epstats/toolkit/experiment.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def evaluate_by_unit(self, goals: pd.DataFrame) -> Evaluation:
    """
    Evaluate all metrics and checks in the experiment from goals grouped by `unit_id`.

    This method is useful when there are not many (<1M) units in the experiment to evaluate it.
    If there many units exposed to the experiment, pre-aggregate data and use [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].

    Does best effort to fill in missing goals and variants with zeros.

    Arguments:
        goals: dataframe with one row per goal and aggregated data in columns

    `goals` dataframe columns:

    1. `exp_id` - experiment id
    1. `exp_variant_id` - variant
    1. `unit_type` - randomization unit type
    1. `unit_id` - (randomization) unit id
    1. `agg_type` - level of aggregation
    1. `goal` - goal name
    1. any number of dimensional columns, e.g. column `product` containing values `p_1`
    1. `count` - number of observed goals
    1. `sum_value` - value of observed goals

    Returns:
        set of dataframes with evaluation

    Usage:

    ```python
    from epstats.toolkit import Experiment, Metric, SrmCheck
    experiment = Experiment(
        'test-real-valued',
        'a',
        [Metric(
            2,
            'Average Bookings',
            'value(test_unit_type.unit.conversion)',
            'count(test_unit_type.unit.exposure)')
        ],
        [],
        unit_type='test_unit_type')

    # This gets testing data, use other Dao or get aggregated goals in some other way.
    from epstats.toolkit.testing import TestData
    goals = TestData.load_goals_by_unit(experiment.id)

    # evaluate experiment
    ev = experiment.evaluate_by_unit(goals)

    # work with results
    print(ev.exposures)
    print(ev.metrics[ev.metrics == 1])
    print(ev.checks[ev.checks == 1])

    # this is to assert that this code sample works correctly
    from epstats.toolkit.testing import TestDao
    assert_experiment(experiment, ev, TestDao(TestData()))
    ```

    Input data frame example:

    ```
    exp_id      exp_variant_id  unit_type       unit_id             agg_type    goal              product             count   sum_value
    test-srm    a               test_unit_type  test_unit_type_1    unit        exposure                              1       1
    test-srm    a               test_unit_type  test_unit_type_1    unit        conversion        product_1           2       75
    test-srm    b               test_unit_type  test_unit_type_2    unit        exposure                              1       1
    test-srm    b               test_unit_type  test_unit_type_3    unit        exposure                              1       1
    test-srm    b               test_unit_type  test_unit_type_3    unit        conversion        product_2           1       1
    ```
    """
    g = self._fix_missing_by_unit(goals)

    # We need to pivot table to get all goals per `unit_id` on the same row in the data frame.
    # This is needed to be able to vector-evaluate compound metrics
    # eg. `value(test_unit_type.unit.conversion) - value(test_unit_type.unit.refund)`
    g = (
        pd.pivot_table(
            g,
            values=["count", "sum_value"],
            index=[
                "exp_id",
                "exp_variant_id",
                "unit_type",
                "agg_type",
                "unit_id",
            ]
            + self.get_dimension_columns(),
            columns="goal",
            aggfunc="sum",
            fill_value=0,
        )
        .swaplevel(axis=1)
        .reset_index()
    )

    return self._evaluate(
        g,
        Experiment._metrics_column_fce_by_unit,
        Experiment._checks_fce_by_unit,
        Experiment._exposures_fce_by_unit,
    )

evaluate_wide_agg(goals)

This is a simplified version of the method evaluate_agg.

It consumes simple input goals dataframe, transfers it into suitable dataframe format and evaluate it using general method evaluate_agg.

It assumes that the first two columns are name of the experiment and variants. Than follows columns with data.

See usage of the method in the notebook Ad-hoc A/B test evaluation using Ep-Stats.

Parameters:

Name Type Description Default
goals DataFrame

dataframe with one row per variant and aggregated data in columns

required

Possible goals dataframe columns (check the input dataframe example):

  1. exp_id - experiment id
  2. exp_variant_id - variant
  3. clicks - sum of clicks
  4. views - sum of views
  5. bookings - sum of bookings
  6. bookings_squared - sum of bookings squared

Returns:

Type Description
Evaluation

set of dataframes with evaluation

Usage:

from epstats.toolkit import Experiment, SimpleMetric, SimpleSrmCheck
from epstats.toolkit.results import results_long_to_wide, format_results
from epstats.toolkit.testing import TestData

# Load Test Data
goals = TestData.load_goals_simple_agg()

# Define the experiment
unit_type = 'test_unit_type'
experiment = Experiment(
    'test-simple-metric',
    'a',
    [
        SimpleMetric(1, 'Click-through Rate (CTR)', 'clicks', 'views', unit_type),
        SimpleMetric(2, 'Conversion Rate', 'conversions', 'views', unit_type),
        SimpleMetric(3, 'Revenue per Mille (RPM)', 'bookings', 'views', unit_type, metric_format='${:,.2f}', metric_value_multiplier=1000),
    ],
    [SimpleSrmCheck(1, 'SRM', 'views')],
    unit_type=unit_type)

# Evaluate the experiment
ev = experiment.evaluate_wide_agg(goals)

# Work with results
print(ev.exposures)
print(ev.metrics)
print(ev.checks)

# Possible formatting of metrics
ev.metrics.pipe(results_long_to_wide).pipe(format_results, experiment, format_pct='{:.1%}', format_pval='{:.3f}')

Input dataframe example:

experiment_id   variant_id  views   clicks  conversions     bookings    bookings_squared
my-exp          a           473661  48194   413             17152       803105
my-exp          b           471485  47184   360             14503       677178
my-exp          c           477159  48841   406             15892       711661
my-exp          d           474934  49090   289             11995       566700

Source code in src/epstats/toolkit/experiment.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def evaluate_wide_agg(self, goals: pd.DataFrame) -> Evaluation:
    """
    This is a simplified version of the method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].

    It consumes simple input `goals` dataframe, transfers it into suitable dataframe format and evaluate it using general method [`evaluate_agg`][epstats.toolkit.experiment.Experiment.evaluate_agg].

    It assumes that the first two columns are name of the experiment and variants. Than follows columns with data.

    See usage of the method in the notebook [Ad-hoc A/B test evaluation using Ep-Stats][ad-hoc-ab-test-evaluation-using-ep-stats].

    Arguments:
        goals: dataframe with one row per variant and aggregated data in columns

    Possible `goals` dataframe columns (check the input dataframe example):

    1. `exp_id` - experiment id
    1. `exp_variant_id` - variant
    1. `clicks` - sum of clicks
    1. `views` - sum of views
    1. `bookings` - sum of bookings
    1. `bookings_squared` - sum of bookings squared

    Returns:
        set of dataframes with evaluation

    Usage:

    ```python
    from epstats.toolkit import Experiment, SimpleMetric, SimpleSrmCheck
    from epstats.toolkit.results import results_long_to_wide, format_results
    from epstats.toolkit.testing import TestData

    # Load Test Data
    goals = TestData.load_goals_simple_agg()

    # Define the experiment
    unit_type = 'test_unit_type'
    experiment = Experiment(
        'test-simple-metric',
        'a',
        [
            SimpleMetric(1, 'Click-through Rate (CTR)', 'clicks', 'views', unit_type),
            SimpleMetric(2, 'Conversion Rate', 'conversions', 'views', unit_type),
            SimpleMetric(3, 'Revenue per Mille (RPM)', 'bookings', 'views', unit_type, metric_format='${:,.2f}', metric_value_multiplier=1000),
        ],
        [SimpleSrmCheck(1, 'SRM', 'views')],
        unit_type=unit_type)

    # Evaluate the experiment
    ev = experiment.evaluate_wide_agg(goals)

    # Work with results
    print(ev.exposures)
    print(ev.metrics)
    print(ev.checks)

    # Possible formatting of metrics
    ev.metrics.pipe(results_long_to_wide).pipe(format_results, experiment, format_pct='{:.1%}', format_pval='{:.3f}')
    ```

    Input dataframe example:
    ```
    experiment_id   variant_id  views   clicks  conversions     bookings    bookings_squared
    my-exp          a           473661  48194   413             17152       803105
    my-exp          b           471485  47184   360             14503       677178
    my-exp          c           477159  48841   406             15892       711661
    my-exp          d           474934  49090   289             11995       566700
    ```
    """
    g = goals_wide_to_long(goals, self.unit_type)
    return self.evaluate_agg(g)

get_dimension_columns()

Returns a list of all dimensions used in all metrics in the experiment.

Source code in src/epstats/toolkit/experiment.py
580
581
582
583
584
def get_dimension_columns(self) -> List[str]:
    """
    Returns a list of all dimensions used in all metrics in the experiment.
    """
    return list({d for g in self.get_goals() for d in g.dimension_to_value.keys()})

get_goals()

List of all goals needed to evaluate all metrics and checks in the experiment.

Returns:

Type Description
List[EpGoal]

list of parsed structured goals

Source code in src/epstats/toolkit/experiment.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def get_goals(self) -> List[EpGoal]:
    """
    List of all goals needed to evaluate all metrics and checks in the experiment.

    Returns:
        list of parsed structured goals
    """
    res = set()
    for m in self.metrics:
        res = res.union(m.get_goals())
    for c in self.checks:
        res = res.union(c.get_goals())
    res = res.union(self._exposure_goals)
    return list(res)