From a2c4eb308e05faa9ebfe96ec02ea87e7587bce39 Mon Sep 17 00:00:00 2001 From: Anna Scholtz Date: Fri, 29 Aug 2025 15:08:37 -0700 Subject: [PATCH 1/2] [DENG-8655] Period-over-period support for metric-hub metrics --- generator/views/metric_definitions_view.py | 203 +++++++++++++++++++-- 1 file changed, 185 insertions(+), 18 deletions(-) diff --git a/generator/views/metric_definitions_view.py b/generator/views/metric_definitions_view.py index f9ed8b6b..afb5db35 100644 --- a/generator/views/metric_definitions_view.py +++ b/generator/views/metric_definitions_view.py @@ -202,10 +202,13 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: f""" {data_source.name}.{data_source.submission_date_column or "submission_date"} BETWEEN - COALESCE( - SAFE_CAST( - {{% date_start submission_date %}} AS DATE - ), CURRENT_DATE()) AND + DATE_SUB( + COALESCE( + SAFE_CAST( + {{% date_start submission_date %}} AS DATE + ), CURRENT_DATE()), + INTERVAL {{% parameter lookback_days %}} DAY + ) AND COALESCE( SAFE_CAST( {{% date_end submission_date %}} AS DATE @@ -350,8 +353,9 @@ def get_dimension_groups(self) -> List[Dict[str, Any]]: { "name": "submission", "type": "time", + "datatype": "date", "group_label": "Base Fields", - "sql": "CAST(${TABLE}.analysis_basis AS TIMESTAMP)", + "sql": "${TABLE}.analysis_basis", "label": "Submission", "timeframes": [ "raw", @@ -412,6 +416,23 @@ def _get_parameters(self, dimensions: List[dict]): "default_value": "100", "hidden": hide_sampling, }, + { + "name": "lookback_days", + "label": "Lookback (Days)", + "type": "unquoted", + "description": "Number of days added before the filtered date range. " + + "Useful for period-over-period comparisons.", + "default_value": "0", + }, + { + "name": "date_groupby_position", + "label": "Date Group By Position", + "type": "unquoted", + "description": "Position of the date field in the group by clause. " + + "Required when submission_week, submission_month, submission_quarter, submission_year " + + "is selected as BigQuery can't correctly resolve the GROUP BY otherwise", + "default_value": "", + }, ] def get_measures( @@ -563,20 +584,166 @@ def get_measures( ) elif statistic_slug == "rolling_average": aggregation = statistic_conf.get("aggregation", "sum") + matching_measures = [ + m + for m in measures + if m["name"].startswith( + f"{dimension['name']}_{aggregation}" + ) + ] if "window_sizes" in statistic_conf: for window_size in statistic_conf["window_sizes"]: - measures.append( - { - "name": f"{dimension['name']}_{window_size}_day_{statistic_slug}", - "type": "number", - "label": f"{dimension_label} {window_size} Day Rolling Average", - "sql": f""" - AVG({aggregation}(${{TABLE}}.{dimension["name"]} * {sampling})) OVER ( - ROWS {window_size} PRECEDING - )""", - "group_label": "Statistics", - "description": f"{window_size} day rolling average of {dimension_label}", - } - ) + for matching_measure in matching_measures: + measures.append( + { + "name": f"{matching_measure['name']}_{window_size}_day", + "type": "number", + "label": f"{matching_measure['label']} {window_size} Day Rolling Average", + "sql": f""" + {{% if {self.name}.submission_date._is_selected or + {self.name}.submission_week._is_selected or + {self.name}.submission_month._is_selected or + {self.name}.submission_quarter._is_selected or + {self.name}.submission_year._is_selected %}} + AVG(${{{matching_measure['name']}}}) OVER ( + {{% if date_groupby_position._parameter_value != "" %}} + ORDER BY {{% parameter date_groupby_position %}} + {{% elsif {self.name}.submission_date._is_selected %}} + ORDER BY ${{TABLE}}.analysis_basis + {{% else %}} + ERROR("date_groupby_position needs to be set when using submission_week, + submission_month, submission_quarter, or submission_year") + {{% endif %}} + ROWS BETWEEN {window_size} PRECEDING AND CURRENT ROW + {{% else %}} + ERROR('Please select a "submission_*" field to compute the rolling average') + {{% endif %}} + )""", + "group_label": "Statistics", + "description": f"{window_size} day rolling average of {dimension_label}", + } + ) + + # period-over-period measures + if "period_over_period" in statistic_conf: + matching_measures = [ + m + for m in measures + if m["name"].startswith( + f"{dimension['name']}_{statistic_slug}" + ) + and "_period_over_period_" not in m["name"] + ] + for period in statistic_conf["period_over_period"].get( + "periods", [] + ): + for matching_measure in matching_measures: + original_sql = matching_measure["sql"] + if statistic_slug == "rolling_average": + rows_match = re.search( + r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", + original_sql, + ) + + if rows_match: + original_window_size = int(rows_match.group(1)) + + # Create time-adjusted window sizes + time_conditions = [] + for unit, divisor in [ + ("date", 1), + ("week", 7), + ("month", 30), + ("quarter", 90), + ("year", 365), + ]: + adjusted_window = ( + (original_window_size + period) + // divisor + if unit != "date" + else original_window_size + period + ) + condition = ( + f"{{% {'if' if unit == 'date' else 'elif'} " + + f"{self.name}.submission_{unit}._is_selected %}}" + ) + modified_sql = re.sub( + r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", + f"ROWS BETWEEN {adjusted_window} PRECEDING AND CURRENT ROW", + original_sql, + ) + time_conditions.append( + f"{condition}\n{modified_sql}" + ) + + sql = ( + "\n".join(time_conditions) + + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" + ) + else: + sql = original_sql + else: + # Create LAG with time-adjusted periods and ORDER BY + time_conditions = [] + for unit, divisor in [ + ("date", 1), + ("week", 7), + ("month", 30), + ("quarter", 90), + ("year", 365), + ]: + adjusted_period = ( + period // divisor + if unit != "date" + else period + ) + order_by = ( + f"${{submission_{unit}}}" + if unit != "date" + else "${submission_date}" + ) + condition = ( + f"{{% {'if' if unit == 'date' else 'elif'} " + + f"{self.name}.submission_{unit}._is_selected %}}" + ) + lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( + {{% if date_groupby_position._parameter_value %}} + ORDER BY {{% parameter date_groupby_position %}} + {{% else %}} + ORDER BY {order_by} + {{% endif %}} + )""" + time_conditions.append( + f"{condition}\n{lag_sql}" + ) + + sql = ( + "\n".join(time_conditions) + + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " + + "OVER (ORDER BY ${submission_date})\n{{% endif %}}" + ) + + for kind in statistic_conf["period_over_period"].get( + "kinds", ["previous"] + ): + if kind == "difference": + sql = f"({original_sql}) - ({sql})" + elif kind == "relative_change": + sql = f"SAFE_DIVIDE((({sql}) - ({original_sql})), NULLIF(({original_sql}), 0))" + elif kind == "previous": + pass + + measures.append( + { + "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", + "type": "number", + "label": f"{matching_measure['label']} " + + f"{period} Day Period Over Period {kind.capitalize()}", + "description": f"Period over period {kind.capitalize()} of " + + f"{matching_measure['label']} over {period} days", + "group_label": "Statistics", + "sql": sql, + } + ) return measures From e067faf27b09ba77ec126347cb0fb2cae29fef80 Mon Sep 17 00:00:00 2001 From: Anna Scholtz Date: Wed, 3 Sep 2025 10:59:35 -0700 Subject: [PATCH 2/2] Adjust period-over-period logic --- .flake8 | 2 +- .../explores/metric_definitions_explore.py | 4 +- generator/views/metric_definitions_view.py | 301 +++++++++++------- 3 files changed, 180 insertions(+), 127 deletions(-) diff --git a/.flake8 b/.flake8 index 8166c398..11ac08cc 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] -max-line-length = 120 +max-line-length = 130 ignore = E203, W503 diff --git a/generator/explores/metric_definitions_explore.py b/generator/explores/metric_definitions_explore.py index fb66312c..6e6d13c1 100644 --- a/generator/explores/metric_definitions_explore.py +++ b/generator/explores/metric_definitions_explore.py @@ -44,9 +44,7 @@ def _to_lookml( explore_lookml: Dict[str, Any] = { "name": self.name, - "always_filter": { - "filters": [{"submission_date": "7 days"}, {"sampling": "1"}] - }, + "always_filter": {"filters": [{"sampling": "1"}]}, # The base view is the only view that exposes the date and client_id fields. # All other views only expose the metric definitions. "fields": exposed_fields, diff --git a/generator/views/metric_definitions_view.py b/generator/views/metric_definitions_view.py index afb5db35..dfef5c58 100644 --- a/generator/views/metric_definitions_view.py +++ b/generator/views/metric_definitions_view.py @@ -16,6 +16,15 @@ class MetricDefinitionsView(View): type: str = "metric_definitions_view" + # Time unit divisors for converting days to different granularities + TIME_UNITS = [ + ("date", 1), # Daily: no conversion needed + ("week", 7), # Weekly: divide by 7 + ("month", 30), # Monthly: approximate 30 days per month + ("quarter", 90), # Quarterly: approximate 90 days per quarter + ("year", 365), # Yearly: approximate 365 days per year + ] + def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): """Get an instance of an MetricDefinitionsView.""" super().__init__(namespace, name, MetricDefinitionsView.type, tables) @@ -201,6 +210,20 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: [ f""" {data_source.name}.{data_source.submission_date_column or "submission_date"} + {{% if _filters['analysis_period'] != "" %}} + BETWEEN + DATE_SUB( + COALESCE( + SAFE_CAST( + {{% date_start analysis_period %}} AS DATE + ), CURRENT_DATE()), + INTERVAL {{% parameter lookback_days %}} DAY + ) AND + COALESCE( + SAFE_CAST( + {{% date_end analysis_period %}} AS DATE + ), CURRENT_DATE()) + {{% else %}} BETWEEN DATE_SUB( COALESCE( @@ -213,6 +236,7 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: SAFE_CAST( {{% date_end submission_date %}} AS DATE ), CURRENT_DATE()) + {{% endif %}} """ for data_source in [data_source_definition] + joined_data_sources if data_source.submission_date_column != "NULL" @@ -305,6 +329,7 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: ) view_defn["sets"] = self._get_sets() view_defn["parameters"] = self._get_parameters(view_defn["dimensions"]) + view_defn["filters"] = self._get_filters() return {"views": [view_defn]} @@ -435,6 +460,18 @@ def _get_parameters(self, dimensions: List[dict]): }, ] + def _get_filters(self): + return [ + { + "name": "analysis_period", + "type": "date", + "label": "Analysis Period (with Lookback)", + "description": "Use this filter to define the main analysis period. " + + "The results will include the selected date range plus any additional " + + "days specified by the 'Lookback days' setting.", + } + ] + def get_measures( self, dimensions: List[dict] ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: @@ -583,49 +620,54 @@ def get_measures( } ) elif statistic_slug == "rolling_average": - aggregation = statistic_conf.get("aggregation", "sum") - matching_measures = [ - m - for m in measures - if m["name"].startswith( - f"{dimension['name']}_{aggregation}" - ) - ] - if "window_sizes" in statistic_conf: - for window_size in statistic_conf["window_sizes"]: - for matching_measure in matching_measures: - measures.append( - { - "name": f"{matching_measure['name']}_{window_size}_day", - "type": "number", - "label": f"{matching_measure['label']} {window_size} Day Rolling Average", - "sql": f""" - {{% if {self.name}.submission_date._is_selected or - {self.name}.submission_week._is_selected or - {self.name}.submission_month._is_selected or - {self.name}.submission_quarter._is_selected or - {self.name}.submission_year._is_selected %}} - AVG(${{{matching_measure['name']}}}) OVER ( - {{% if date_groupby_position._parameter_value != "" %}} - ORDER BY {{% parameter date_groupby_position %}} - {{% elsif {self.name}.submission_date._is_selected %}} - ORDER BY ${{TABLE}}.analysis_basis + # rolling averages are computed over existing statistics (e.g. sum, ratio) + aggregations = statistic_conf.get("aggregations", ["sum"]) + for aggregation in aggregations: + # find measures that match the current dimension and aggregation type + matching_measures = [ + m + for m in measures + if m["name"].startswith( + f"{dimension['name']}_{aggregation}" + ) + ] + if "window_sizes" in statistic_conf: + for window_size in statistic_conf["window_sizes"]: + for matching_measure in matching_measures: + # these statistics require some time dimension to be selected + measures.append( + { + "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", + "type": "number", + "label": f"{matching_measure['label']} {window_size} Day Rolling Average", + "sql": f""" + {{% if {self.name}.submission_date._is_selected or + {self.name}.submission_week._is_selected or + {self.name}.submission_month._is_selected or + {self.name}.submission_quarter._is_selected or + {self.name}.submission_year._is_selected %}} + AVG(${{{matching_measure['name']}}}) OVER ( + {{% if date_groupby_position._parameter_value != "" %}} + ORDER BY {{% parameter date_groupby_position %}} + {{% elsif {self.name}.submission_date._is_selected %}} + ORDER BY ${{TABLE}}.analysis_basis + {{% else %}} + ERROR("date_groupby_position needs to be set when using submission_week, + submission_month, submission_quarter, or submission_year") + {{% endif %}} + ROWS BETWEEN {window_size} PRECEDING AND CURRENT ROW {{% else %}} - ERROR("date_groupby_position needs to be set when using submission_week, - submission_month, submission_quarter, or submission_year") + ERROR('Please select a "submission_*" field to compute the rolling average') {{% endif %}} - ROWS BETWEEN {window_size} PRECEDING AND CURRENT ROW - {{% else %}} - ERROR('Please select a "submission_*" field to compute the rolling average') - {{% endif %}} - )""", - "group_label": "Statistics", - "description": f"{window_size} day rolling average of {dimension_label}", - } - ) + )""", + "group_label": "Statistics", + "description": f"{window_size} day rolling average of {dimension_label}", + } + ) - # period-over-period measures + # period-over-period measures compare current values with historical values if "period_over_period" in statistic_conf: + # find all statistics that have period-over-period configured matching_measures = [ m for m in measures @@ -634,104 +676,36 @@ def get_measures( ) and "_period_over_period_" not in m["name"] ] + + # create period-over-period measures for each configured time period for period in statistic_conf["period_over_period"].get( "periods", [] ): for matching_measure in matching_measures: original_sql = matching_measure["sql"] + + # rolling averages need special handling to adjust window sizes + # based on the selected time granularity if statistic_slug == "rolling_average": - rows_match = re.search( - r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", - original_sql, + sql = self._create_rolling_average_period_sql( + original_sql, period ) - - if rows_match: - original_window_size = int(rows_match.group(1)) - - # Create time-adjusted window sizes - time_conditions = [] - for unit, divisor in [ - ("date", 1), - ("week", 7), - ("month", 30), - ("quarter", 90), - ("year", 365), - ]: - adjusted_window = ( - (original_window_size + period) - // divisor - if unit != "date" - else original_window_size + period - ) - condition = ( - f"{{% {'if' if unit == 'date' else 'elif'} " - + f"{self.name}.submission_{unit}._is_selected %}}" - ) - modified_sql = re.sub( - r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", - f"ROWS BETWEEN {adjusted_window} PRECEDING AND CURRENT ROW", - original_sql, - ) - time_conditions.append( - f"{condition}\n{modified_sql}" - ) - - sql = ( - "\n".join(time_conditions) - + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" - ) - else: - sql = original_sql else: - # Create LAG with time-adjusted periods and ORDER BY - time_conditions = [] - for unit, divisor in [ - ("date", 1), - ("week", 7), - ("month", 30), - ("quarter", 90), - ("year", 365), - ]: - adjusted_period = ( - period // divisor - if unit != "date" - else period - ) - order_by = ( - f"${{submission_{unit}}}" - if unit != "date" - else "${submission_date}" - ) - condition = ( - f"{{% {'if' if unit == 'date' else 'elif'} " - + f"{self.name}.submission_{unit}._is_selected %}}" - ) - lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( - {{% if date_groupby_position._parameter_value %}} - ORDER BY {{% parameter date_groupby_position %}} - {{% else %}} - ORDER BY {order_by} - {{% endif %}} - )""" - time_conditions.append( - f"{condition}\n{lag_sql}" - ) - - sql = ( - "\n".join(time_conditions) - + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " - + "OVER (ORDER BY ${submission_date})\n{{% endif %}}" + # standard measures use LAG function with time-adjusted periods + sql = self._create_lag_period_sql( + matching_measure, period ) + # generate different types of period-over-period comparisons for kind in statistic_conf["period_over_period"].get( "kinds", ["previous"] ): if kind == "difference": - sql = f"({original_sql}) - ({sql})" + comparison_sql = f"({original_sql}) - ({sql})" elif kind == "relative_change": - sql = f"SAFE_DIVIDE((({sql}) - ({original_sql})), NULLIF(({original_sql}), 0))" - elif kind == "previous": - pass + comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" + else: + comparison_sql = sql measures.append( { @@ -742,8 +716,89 @@ def get_measures( "description": f"Period over period {kind.capitalize()} of " + f"{matching_measure['label']} over {period} days", "group_label": "Statistics", - "sql": sql, + "sql": comparison_sql, } ) return measures + + def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str: + """ + Create period-over-period SQL for rolling average measures. + + Rolling averages require adjusting the window size based on the selected time granularity. + """ + rows_match = re.search( + r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", + original_sql, + ) + + if not rows_match: + return original_sql + + original_window_size = int(rows_match.group(1)) + time_conditions = [] + + for unit, divisor in self.TIME_UNITS: + # calculate adjusted window size for this time granularity + adjusted_window = ( + (original_window_size + period) // divisor + if unit != "date" + else original_window_size + period + ) + + condition = ( + f"{{% {'if' if unit == 'date' else 'elsif'} " + + f"{self.name}.submission_{unit}._is_selected %}}" + ) + + # modify the ROWS clause to extend the window by the period + modified_sql = re.sub( + r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", + f"ROWS BETWEEN {adjusted_window} PRECEDING AND " + + f"{adjusted_window - original_window_size} PRECEDING", + original_sql, + ) + time_conditions.append(f"{condition}\n{modified_sql}") + + return ( + "\n".join(time_conditions) + + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" + ) + + def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str: + """ + Create period-over-period SQL using LAG function for standard measures. + + LAG function looks back N periods to get historical values. The period is adjusted + based on the selected time granularity (daily, weekly, monthly, etc.). + """ + time_conditions = [] + + for unit, divisor in self.TIME_UNITS: + # calculate adjusted period for this time granularity + adjusted_period = period // divisor if unit != "date" else period + + order_by = ( + f"${{submission_{unit}}}" if unit != "date" else "${submission_date}" + ) + + condition = ( + f"{{% {'if' if unit == 'date' else 'elsif'} " + + f"{self.name}.submission_{unit}._is_selected %}}" + ) + + lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( + {{% if date_groupby_position._parameter_value != "" %}} + ORDER BY {{% parameter date_groupby_position %}} + {{% else %}} + ORDER BY {order_by} + {{% endif %}} + )""" + time_conditions.append(f"{condition}\n{lag_sql}") + + return ( + "\n".join(time_conditions) + + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " + + "OVER (ORDER BY ${submission_date})\n{% endif %}" + )