diff --git a/CHANGES.rst b/CHANGES.rst index 40c6d1230..89606ffbc 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -31,6 +31,15 @@ New Features :meth:`DataOp.skb.eval`, :meth:`SkrubLearner.predict`, etc., or in :meth:`DataOp.skb.find` or :meth:`SkrubLearner.truncated_after`. :pr:`2062` by :user:`Jérôme Dockès `. +- The :class:`SessionEncoder` is now available. This encoder adds a `session_id` + column, which groups together events that occur within the given session gap. + Additionally, it is possible to provide a ``split_by`` column or list of columns + (e.g., user ID or (user ID, user device)) to compute sessions for each grouping + value. + :pr:`1930` by :user:`Riccardo Cappuzzo `. +- A new synthetic dataset generator for timestamped data and session-based + operations has been added: :meth:`~skrub.datasets.make_retail_events`. + :pr:`1930` by :user:`Riccardo Cappuzzo `. - The :class:`DropSimilar` transformer has been added, for removing columns in a dataframe that present high correlation with other columns. :pr:`2023` by :user:`Eloi Massoulié `. diff --git a/doc/api_reference.py b/doc/api_reference.py index 9761df3f0..5f9ae4316 100644 --- a/doc/api_reference.py +++ b/doc/api_reference.py @@ -90,6 +90,7 @@ "SimilarityEncoder", "ToCategorical", "DatetimeEncoder", + "SessionEncoder", "ToDatetime", "ToFloat", ], @@ -339,6 +340,9 @@ "datasets.get_data_dir", "datasets.make_deduplication_data", "datasets.toy_orders", + "datasets.toy_products", + "datasets.toy_cities", + "datasets.make_retail_events", ], } ], diff --git a/doc/modules/multi_column_operations/sessionization.rst b/doc/modules/multi_column_operations/sessionization.rst new file mode 100644 index 000000000..060e74a81 --- /dev/null +++ b/doc/modules/multi_column_operations/sessionization.rst @@ -0,0 +1,101 @@ +.. _sessionization: + +.. |SessionEncoder| replace:: :class:`~skrub.SessionEncoder` +.. |BaseEstimator| replace:: :class:`~sklearn.base.BaseEstimator` +.. |TransformerMixin| replace:: :class:`~sklearn.base.TransformerMixin` + + +Detecting sessions in timestamped data with the SessionEncoder +---------------------------------------------------------------- + +When dealing with timestamped data (data that includes at least a timestamp column), +it may be beneficial to try and identify groups of events as +:ref:`"sessions" `_, +through **sessionization**. + +Sessionization is the process of grouping a sequence of events (like user +interactions) into meaningful sessions. +For example, in an online retail context you might define a new session whenever +more than 30 minutes pass with no activity from the user. On a website, a session may +define a sequence of requests made by a single end-user within a certain time duration. + +While definitions may vary depending on the specific use case, being able to detect +such "bursts" of activity by a user can often help with building features that have +greater predictive power than raw individual events, such as number of sessions or +average session duration. + +The |SessionEncoder| addresses this problem by detecting sessions based on +a timestamp column, other session-related columns (e.g., user and device) that should be +used to distinguish between sessions, and a ``session_gap``. Session-related columns +-- identified by the ``split_by`` parameter -- allow to split sessions based on +the provided parameters, for example to group user actions only if they were conducted +on the same device. + +A session is then defined as a sequence of events that share the same value in the +``split_by`` columns, and whose events are closer to each other than the +``session_gap``. + +>>> from skrub import SessionEncoder +>>> from skrub.datasets import make_retail_events +>>> events = make_retail_events(n_events=100, random_state=0) +>>> X, y = events.X, events.y + +Once the necessary features are provided, the |SessionEncoder| +returns a dataframe that includes a ``timestamp_session_id`` column, which is +composed of a monotonically increasing integer ID for each session: +>>> se = SessionEncoder(timestamp_col="timestamp", split_by="user_id", session_gap=30 * 60) +>>> res = se.fit_transform(X) +>>> res.head(5) # doctest: +SKIP + user_id timestamp device_type page_category event_type time_on_page price_viewed timestamp_session_id +0 user_0164 2024-01-01 03:29:07.708922+00:00 mobile fashion page_view 134.1 309.80 59 +1 user_0164 2024-01-01 03:29:42.185048+00:00 tablet books search 103.4 11.00 59 +2 user_0164 2024-01-01 03:32:38.352703+00:00 desktop home wishlist 180.3 4.80 59 +3 user_0008 2024-01-02 10:49:56.974375+00:00 mobile books page_view 7.0 33.94 2 +4 user_0149 2024-01-04 10:00:15.882835+00:00 desktop electronics page_view 108.5 4.44 49 + +With the session ID, it becomes possible to compute aggregations on +each session, for example to find the duration or number of sessions +by a user. + +.. warning:: + +Caution! Aggregation can introduce data leakage. Records should only be aggregated from +within the training set at training time, and the test set at predict time. To +ensure this is the case, any code that performs aggregation can be wrapped in a +scikit-learn |BaseEstimator| (as shown in the +:ref:`SessionEncoder example `), +otherwise the pipeline should use the skrub :ref:`Data Ops framework`. + +The |SessionEncoder| includes the ``suffix`` parameter (by default +``suffix="session_id"``) to specify what the name of the new column should be. +This can help with creating multiple session IDs based on the same timestamp. +For example, we might want to create sessions based on users, and based on users +and their device: + +>>> se = SessionEncoder(timestamp_col="timestamp", +... split_by="user_id", +... session_gap=30 * 60, +... suffix="user" +... ) +>>> res = se.fit_transform(X) +>>> res.head(5) # doctest: +SKIP + user_id timestamp ... price_viewed timestamp_user +0 user_0164 2024-01-01 03:29:07.708922+00:00 ... 309.80 59 +1 user_0164 2024-01-01 03:29:42.185048+00:00 ... 11.00 59 +2 user_0164 2024-01-01 03:32:38.352703+00:00 ... 4.80 59 +3 user_0008 2024-01-02 10:49:56.974375+00:00 ... 33.94 2 +4 user_0149 2024-01-04 10:00:15.882835+00:00 ... 4.44 49 + +>>> se = SessionEncoder(timestamp_col="timestamp", +... split_by=["user_id", "device_type"], +... session_gap=30 * 60, +... suffix="user_device" +... ) +>>> res = se.fit_transform(X) +>>> res.head(5) # doctest: +SKIP + user_id timestamp ... price_viewed timestamp_user_device +0 user_0164 2024-01-01 03:29:07.708922+00:00 ... 309.80 75 +1 user_0164 2024-01-01 03:29:42.185048+00:00 ... 11.00 76 +2 user_0164 2024-01-01 03:32:38.352703+00:00 ... 4.80 74 +3 user_0008 2024-01-02 10:49:56.974375+00:00 ... 33.94 2 +4 user_0149 2024-01-04 10:00:15.882835+00:00 ... 4.44 59 diff --git a/doc/multi_column_operations.rst b/doc/multi_column_operations.rst index fc5e5e7a0..403f70b1a 100644 --- a/doc/multi_column_operations.rst +++ b/doc/multi_column_operations.rst @@ -15,3 +15,4 @@ multiple columns. modules/multi_column_operations/selectors modules/multi_column_operations/type_of_selectors modules/multi_column_operations/advanced_selectors + modules/multi_column_operations/sessionization diff --git a/examples/0110_session_encoder.py b/examples/0110_session_encoder.py new file mode 100644 index 000000000..2b98761e9 --- /dev/null +++ b/examples/0110_session_encoder.py @@ -0,0 +1,174 @@ +""" + +Sessions in time-based data: Predicting user purchases with the SessionEncoder +=============================================================================== + +.. |SessionEncoder| replace:: :class:`~skrub.SessionEncoder` +.. |make_retail_events| replace:: :func:`~skrub.datasets.make_retail_events` +.. |tabular_pipeline| replace:: :func:`~skrub.tabular_pipeline` +.. |TableVectorizer| replace:: :class:`~skrub.TableVectorizer` +.. |DummyClassifier| replace:: :class:`~sklearn.dummy.DummyClassifier` +.. |TimeSeriesSplit| replace:: :class:`~sklearn.model_selection.TimeSeriesSplit` +.. |BaseEstimator| replace:: :class:`~sklearn.base.BaseEstimator` +.. |TransformerMixin| replace:: :class:`~sklearn.base.TransformerMixin` + +This example shows how to use |SessionEncoder| in a scikit-learn pipeline to +create session-level features (sessionization) for conversion prediction, that is +predicting whether a user session will eventually lead to a purchase. + +.. topic:: What is sessionization? + + Sessionization is the process of grouping a sequence of events (like user + interactions) into meaningful sessions. A session typically starts fresh or + after a period of inactivity. For example, in an online retail context, you + might define a new session whenever more than 30 minutes pass with no activity + from a user. This allows you to extract session-level features (like the total + number of events in a session or the dominant device type used) which often have + greater predictive power than raw individual events. + +We will: + +1. Use |make_retail_events| to generate synthetic retail event data +2. Build a baseline classifier on raw event-level features with the |tabular_pipeline| +3. Add session-level and historical features with |SessionEncoder| +4. Train the same model again and compare ROC-AUC + +The data includes columns such as event type, device type, viewed price, and +timestamp. The target is binary: whether the session eventually contains a +purchase event or not. + +""" + +# %% +# Since this is temporal data, we use a time-aware CV strategy with +# |TimeSeriesSplit| to avoid leakage. We reuse the same splitter for all evaluations. +# The dataset is sorted by timestamp, so the training set will always contain only +# past data relative to the test set. +from sklearn.model_selection import TimeSeriesSplit + +splitter = TimeSeriesSplit(n_splits=5) +# %% +# We begin by generating the data with |make_retail_events| and defining our +# features and target. +from skrub import TableReport +from skrub.datasets import make_retail_events + +events = make_retail_events(n_users=20, n_events=5000, random_state=0) +X, y = events.X, events.y +TableReport(X) +# %% +# The data contains 5000 events from 20 users, where each event is timestamped. +# Other columns include the event type, device used by the user, page category, +# time spent on page and price of the item. The target variable indicates whether +# a user session eventually contains a purchase event: all events in that session +# will have a target value of 1 if a purchase happens, and 0 otherwise. + +# %% +# Sanity check: evaluate a DummyClassifier on raw event data +# --------------------------------------------------------------- +# We begin by evaluating a |DummyClassifier| on the original event data +# (without session features). Since it's a |DummyClassifier|, we expect +# chance-level performance (ROC-AUC of 0.5). +from sklearn.dummy import DummyClassifier +from sklearn.model_selection import cross_val_score + +dummy = DummyClassifier(strategy="most_frequent") + +scores = cross_val_score(dummy, X, y, cv=splitter, scoring="roc_auc") +print(f"ROC-AUC with DummyClassifier: {scores.mean():.3f}") + +# %% +# First attempt: training a model without using session-level features +# -------------------------------------------------------------------- +# We first use the |tabular_pipeline| on raw event-level data, without any session +# encoding or aggregation. This serves as a baseline to compare against the enriched +# model later. +# Remember that the |tabular_pipeline| will automatically add a |TableVectorizer| +# to perform feature engineering, so the model can still learn from the raw event +# features. However, it won't be able to directly capture session-level patterns. +from skrub import tabular_pipeline + +model = tabular_pipeline("classification") + +scores = cross_val_score(model, X, y, cv=splitter, scoring="roc_auc") +print(f"ROC-AUC without session encoding: {scores.mean():.3f}") +# %% +# The model is not performing much better than the DummyClassifier, which suggests +# that raw event-level features are not sufficient for good conversion prediction. +# This baseline is limited because it cannot directly use session-level behavior +# (for example, whether "add_to_cart" happened in the same session). + +# %% +# A better approach: session encoding and aggregation +# ------------------------------------------------------ +# Next, we use the |SessionEncoder| to create session-level features that we can +# aggregate over. We define a session boundary as "a user has been inactive for +# more than 30 minutes". The |SessionEncoder| will create a new column +# ``timestamp_session_id`` that assigns a unique session ID to each session detected. +# The parameter ``session_gap=30 * 60`` specifies the inactivity threshold in +# seconds (30 minutes). +# +# Note that session-based features involve aggregations, which must be performed +# only on the training data within each fold to avoid leakage. In a scikit-learn +# pipeline, we can achieve this by using |SessionEncoder| followed by a custom +# transformer that computes session aggregates, and ensures that the pipeline is +# properly fitted within each fold of cross-validation. + +# %% +from skrub import SessionEncoder, tabular_pipeline + +se = SessionEncoder("timestamp", split_by="user_id", session_gap=30 * 60) +# Here we fit the SessionEncoder on the entire dataset for demonstration purposes +X_sessions = se.fit_transform(X) +X_sessions.head() + +# %% +# Defining a custom transformer for session-level aggregation +# ----------------------------------------------------------- +# To avoid data leakage and maintain a clean pipeline, we can create a custom +# transformer that inherits from |BaseEstimator| and |TransformerMixin| and +# computes session-level aggregates within a scikit-learn pipeline. +# This transformer will be fitted and applied separately within each fold of +# cross-validation, ensuring that session features are computed only on the training +# data of each fold. + +from sklearn.base import BaseEstimator, TransformerMixin + + +class SessionAggregator(BaseEstimator, TransformerMixin): + def fit(self, X, y=None): + return self + + def transform(self, X): + # Compute session-level aggregates + session_agg = X.groupby("timestamp_session_id").agg( + session_has_add_to_cart=("event_type", lambda x: "add_to_cart" in x.values), + session_n_events=("event_type", "count"), + session_mean_price=("price_viewed", "mean"), + session_dominant_device=("device_type", lambda x: x.mode()[0]), + ) + # Join back to the original data + return X.join(session_agg, on="timestamp_session_id") + + +# %% +# Then, we create a pipeline that includes the |SessionEncoder|, our custom +# ``SessionAggregator``, and the |tabular_pipeline| for classification. This +# pipeline will be used in cross-validation to evaluate the model +# with session features. +from sklearn.pipeline import make_pipeline + +model = make_pipeline(se, SessionAggregator(), tabular_pipeline("classification")) +scores = cross_val_score(model, X, y, cv=splitter, scoring="roc_auc") +print("ROC-AUC with session encoding:", scores.mean()) + +# %% +# As expected the model with session encoding performs much better than the baseline +# without session features, demonstrating the value of sessionization for conversion +# prediction. +# +# The fact that we are working with aggregation means that it was necessary to +# create a custom transformer to compute session-level features. However, this situation +# can be avoided entirely by using the skrub DataOps workflow, which allows for more +# flexible data transformations without needing to fit everything within a +# scikit-learn pipeline. diff --git a/examples/FIXME/1170_session_encoder.py b/examples/FIXME/1170_session_encoder.py new file mode 100644 index 000000000..95fc73dfa --- /dev/null +++ b/examples/FIXME/1170_session_encoder.py @@ -0,0 +1,184 @@ +""" + +.. |SessionEncoder| replace:: :class:`~skrub.SessionEncoder` +.. |make_retail_events| replace:: :func:`~skrub.datasets.make_retail_events` +.. |tabular_pipeline| replace:: :func:`~skrub.tabular_pipeline` +.. |skrub.X| replace:: :func:`~skrub.X` +.. |skrub.y| replace:: :func:`~skrub.y` +.. |TableVectorizer| replace:: :class:`~skrub.TableVectorizer` +.. |DummyClassifier| replace:: :class:`~sklearn.dummy.DummyClassifier` +.. |TimeSeriesSplit| replace:: :class:`~sklearn.model_selection.TimeSeriesSplit` +.. |cross_validate| replace:: :func:`~skrub.cross_validate` +.. |apply_func| replace:: :func:`~skrub.DataOp.skb.apply_func` + +Sessions in time-based data: Using SessionEncoder in rich DataOps pipeline +========================================================================== + +This example shows how to use |SessionEncoder| in a skrub DataOps workflow to +create session-level features (sessionization) for conversion prediction, that is +predicting whether a user session will eventually lead to a purchase. + +.. topic:: What is sessionization? + + Sessionization is the process of grouping a sequence of events (like user + interactions) into meaningful sessions. A session typically starts fresh or + after a period of inactivity. For example, in an online retail context, you + might define a new session whenever more than 30 minutes pass with no activity + from a user. This allows you to extract session-level features (like the total + number of events in a session or the dominant device type used) which often have + greater predictive power than raw individual events. + +We will: + +1. Use |make_retail_events| to generate synthetic retail event data +2. Build a baseline classifier on raw event-level features with the |tabular_pipeline| +3. Add session-level and historical features with |SessionEncoder| +4. Train the same model again and compare ROC-AUC + +The data includes columns such as event type, device type, viewed price, and +timestamp. The target is binary: whether the session eventually contains a +purchase event or not. +""" + +# %% +# Since this is temporal data, we use a time-aware CV strategy with +# |TimeSeriesSplit| to avoid leakage. We reuse the same splitter for all evaluations. +from sklearn.model_selection import TimeSeriesSplit + +splitter = TimeSeriesSplit(n_splits=5) + +# %% +# We begin by generating the data with |make_retail_events| and marking feature +# and target data with |skrub.X| and |skrub.y| so they can be used +# in a DataOps workflow. + +import skrub +from skrub.datasets import make_retail_events + +events = make_retail_events(n_users=20, n_events=5000, random_state=0) +X, y = skrub.X(events.X), skrub.y(events.y) +X +# %% +# Sanity check: evaluate a DummyClassifier on raw event data +# --------------------------------------------------------------- +# We begin by evaluating a |DummyClassifier| on the original event data +# (without session features). Since it's a |DummyClassifier|, we expect +# chance-level performance (ROC-AUC of 0.5). +from sklearn.dummy import DummyClassifier + +dummy = DummyClassifier(strategy="most_frequent") +dummy_pred = X.skb.apply(dummy, y=y) +dummy_learner = dummy_pred.skb.make_learner() +dummy_results = skrub.cross_validate( + dummy_learner, environment=dummy_pred.skb.get_data(), cv=splitter, scoring="roc_auc" +) +print(f"ROC-AUC with DummyClassifier: {dummy_results['test_score'].mean():.3f}") + +# %% +# First attempt: training a model without using session-level features +# -------------------------------------------------------------------- +# We first use the |tabular_pipeline| on raw event-level data, without any session +# encoding or aggregation. This serves as a baseline to compare against the enriched +# model later. +# Remember that the |tabular_pipeline| will automatically add a |TableVectorizer| +# to perform feature engineering, so the model can still learn from the raw event +# features. However, it won't be able to directly capture session-level patterns. +from skrub import tabular_pipeline + +model = tabular_pipeline("classification") + +pred = X.skb.apply(model, y=y) +learner = pred.skb.make_learner() +results = skrub.cross_validate( + learner, environment=pred.skb.get_data(), cv=splitter, scoring="roc_auc" +) +print(f"ROC-AUC without session encoding: {results['test_score'].mean():.3f}") + +# %% +# The model is not performing much better than the DummyClassifier, which suggests +# that raw event-level features are not sufficient for good conversion prediction. +# This baseline is limited because it cannot directly use session-level behavior +# (for example, whether "add_to_cart" happened in the same session). + +# %% +# A better approach: session encoding and aggregation +# ------------------------------------------------------ +# Next, we use the |SessionEncoder| to create session-level features that we can +# aggregate over. We define a session boundary as "a user has been inactive for +# more than 30 minutes". The |SessionEncoder| will create a new column +# ``timestamp_session_id`` that assigns a unique session ID to each session detected. +# The parameter ``session_gap=30 * 60`` specifies the inactivity threshold in +# seconds (30 minutes). + +# %% +from skrub import SessionEncoder + +se = SessionEncoder("timestamp", split_by="user_id", session_gap=30 * 60) +X_sessions = X.skb.apply(se) +X_sessions + +# %% +# ``timestamp_session_id`` identifies the session of each event. +# We use it to compute session-level aggregates and join them back to event-level rows. +# +# .. admonition:: Session-level feature engineering +# :collapsible: closed +# +# We will compute the following session-level features: +# +# - ``session_has_add_to_cart``: whether the session includes at least one +# "add_to_cart" event +# - ``session_n_events``: the total number of events in the session +# - ``session_mean_price``: the mean price viewed during the session +# - ``session_dominant_device``: the most frequently used device type in the session + + +def most_frequent(series): + # mode() can return multiple values; use the first one + # for a deterministic tie-break. + return series.mode().iat[0] + + +def compute_session_features(df): + session_agg = df.groupby("timestamp_session_id").agg( + session_has_add_to_cart=("event_type", lambda x: "add_to_cart" in x.values), + session_n_events=("event_type", "count"), + session_mean_price=("price_viewed", "mean"), + session_dominant_device=("device_type", most_frequent), + ) + df = df.join(session_agg, on="timestamp_session_id") + return df + + +# %% +# We use |apply_func| to apply these feature engineering functions to the data +# with session IDs. +X_enriched = X_sessions.skb.apply_func(compute_session_features) +X_enriched +# %% +# Now we can train the same model on the enriched data with session-level features +# and see if the performance improves. +model = tabular_pipeline("classification") +pred_enriched = X_enriched.skb.apply(model, y=y) +learner_enriched = pred_enriched.skb.make_learner() +results_enriched = skrub.cross_validate( + learner_enriched, + environment=pred_enriched.skb.get_data(), + cv=splitter, + scoring="roc_auc", +) +print(f"ROC-AUC with session encoding: {results_enriched['test_score'].mean():.3f}") + +# %% +# The enriched model clearly outperforms the baseline, showing the value of +# session-level context for conversion prediction. + +# %% +# Discussion +# ----------- +# In DataOps, these aggregations are evaluated with temporal ordering in mind, +# which helps prevent leakage: features for an event are computed only from data +# available up to that event timestamp (provided that the correct splitter is used). +# +# This example focuses on |SessionEncoder| usage, so we intentionally keep modeling +# simple (no hyperparameter tuning and only a small set of engineered features). diff --git a/skrub/__init__.py b/skrub/__init__.py index 451cd4eab..a98713ef0 100644 --- a/skrub/__init__.py +++ b/skrub/__init__.py @@ -40,6 +40,7 @@ from ._multi_agg_joiner import MultiAggJoiner from ._reporting import TableReport, patch_display, unpatch_display from ._select_cols import Drop, DropCols, SelectCols +from ._session_encoder import SessionEncoder from ._similarity_encoder import SimilarityEncoder from ._squashing_scaler import SquashingScaler from ._string_encoder import StringEncoder @@ -107,5 +108,6 @@ "config_context", "ApplyToCols", "ToFloat", + "SessionEncoder", "core", ] diff --git a/skrub/_dataframe/_common.py b/skrub/_dataframe/_common.py index 05f7a3c61..684ffdf4b 100644 --- a/skrub/_dataframe/_common.py +++ b/skrub/_dataframe/_common.py @@ -54,6 +54,7 @@ "reset_index", "copy_index", "index", + "drop_columns", # # Inspecting dtypes and casting # @@ -77,6 +78,7 @@ "is_categorical", "to_categorical", "is_all_null", + "is_empty_frame", # # Inspecting, selecting and modifying values # @@ -632,6 +634,21 @@ def _index_pandas(obj): return obj.index +@dispatch +def drop_columns(df, columns): + raise_dispatch_unregistered_type(df, kind="DataFrame") + + +@drop_columns.specialize("pandas", argument_type="DataFrame") +def _drop_columns_pandas(df, columns): + return df.drop(columns=columns) + + +@drop_columns.specialize("polars", argument_type="DataFrame") +def _drop_columns_polars(df, columns): + return df.drop(columns) + + # # Inspecting dtypes and casting # ============================= @@ -996,6 +1013,21 @@ def _is_all_null_polars(col): return all(is_null(col)) +@dispatch +def is_empty_frame(obj): + raise_dispatch_unregistered_type(obj, kind="object") + + +@is_empty_frame.specialize("pandas", argument_type="DataFrame") +def _is_empty_frame_pandas(obj): + return obj.empty + + +@is_empty_frame.specialize("polars", argument_type="DataFrame") +def _is_empty_frame_polars(obj): + return obj.is_empty() + + # # Inspecting, selecting and modifying values # ========================================== diff --git a/skrub/_dataframe/tests/test_common.py b/skrub/_dataframe/tests/test_common.py index 5ee0707d2..122cb8bd9 100644 --- a/skrub/_dataframe/tests/test_common.py +++ b/skrub/_dataframe/tests/test_common.py @@ -447,6 +447,15 @@ def test_index(df_module): assert ns.index(col) is None +def test_drop_columns(df_module): + df = df_module.example_dataframe + col_names = ns.column_names(df) + col_to_drop = col_names[0] + df_dropped = ns.drop_columns(df, [col_to_drop]) + assert col_to_drop not in ns.column_names(df_dropped) + assert len(ns.column_names(df_dropped)) == len(col_names) - 1 + + # # Inspecting dtypes and casting # ============================= @@ -678,6 +687,14 @@ def test_is_all_null_polars(pl_module): assert ns.is_all_null(col) +def test_is_empty_frame(df_module): + empty_frame = df_module.make_dataframe({"a": []}) + not_empty_frame = df_module.make_dataframe({"a": [1]}) + + assert ns.is_empty_frame(empty_frame) + assert not ns.is_empty_frame(not_empty_frame) + + # Inspecting, selecting and modifying values # ========================================== # diff --git a/skrub/_session_encoder.py b/skrub/_session_encoder.py new file mode 100644 index 000000000..90c09e562 --- /dev/null +++ b/skrub/_session_encoder.py @@ -0,0 +1,544 @@ +""" +The SessionEncoder is a transformer that takes as input: +- a "timestamp" column, which identifies the time of an event +- a "split_by" column or list of columns, which identifies a user +- a "session_gap" value, which identifies the maximum allowed gap in seconds + between events in a session + +It returns a dataframe with the same number of rows as the input, but with an +additional column that identifies the session to which each event belongs. +The name of the session column is "{timestamp}_{suffix}", where "timestamp" is the name +of the timestamp column, and "suffix" is a string that can be set via the "suffix" +parameter (default is "session_id"). The session column contains a unique identifier for +each session, which is a combination of the "split_by" column(s) and a session number +""" + +import numbers +from collections.abc import Iterable + +import numpy as np +import pandas as pd +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.utils.validation import check_is_fitted + +from . import _dataframe as sbd +from . import selectors as s +from ._dispatch import dispatch, raise_dispatch_unregistered_type +from ._utils import random_string + + +@dispatch +def _add_session_column( + X, split_by_columns, timestamp_col, session_gap, session_id_column_ +): + raise_dispatch_unregistered_type(X, kind="Dataframe") + + +@_add_session_column.specialize("pandas") +def _add_session_column_pandas( + X, split_by_columns, timestamp_col, session_gap, session_id_column_ +): + # needed to avoid a warning with min deps + grouper = split_by_columns[0] if len(split_by_columns) == 1 else split_by_columns + groups = X.groupby(grouper) if len(split_by_columns) > 0 else [("", X)] + rolling_session_id = 0 + + groups_with_session_ids = [] + + for group_key, group_df in groups: + # Sort the group by timestamp + group_df_sorted = group_df.sort_values(by=timestamp_col) + # Compute time differences between consecutive events + time_diffs = group_df_sorted[timestamp_col].diff().dt.total_seconds() + # Identify session boundaries based on time gaps + session_boundaries = (time_diffs > session_gap) | (time_diffs.isna()) + # Assign session IDs based on cumulative sum of session boundaries + # cumsum - 1 to start session IDs at 0 + session_ids = session_boundaries.cumsum() - 1 + rolling_session_id + # Update rolling_session_id for the next group + rolling_session_id = session_ids.max() + 1 + # Add the session IDs to the original dataframe + group_df_sorted = group_df_sorted.assign( + **{ + session_id_column_: pd.Series( + session_ids.values, index=group_df_sorted.index + ) + } + ) + groups_with_session_ids.append((group_key, group_df_sorted)) + res = sbd.concat(*[group_df for _, group_df in groups_with_session_ids], axis=0) + return res + + +@_add_session_column.specialize("polars") +def _add_session_column_polars( + X, split_by_columns, timestamp_col, session_gap, session_id_column_ +): + groups = ( + X.group_by(split_by_columns, maintain_order=True) + if len(split_by_columns) > 0 + else [("", X)] + ) + rolling_session_id = 0 + + groups_with_session_ids = [] + + for group_key, group_df in groups: + # Sort the group by timestamp + group_df_sorted = group_df.sort(by=timestamp_col) + # Compute time differences between consecutive events + time_diffs = group_df_sorted[timestamp_col].diff().dt.total_seconds() + # Identify session boundaries based on time gaps + session_boundaries = (time_diffs > session_gap) | ( + # need both is_nan and is_null to handle older versions of polars + time_diffs.is_nan() | time_diffs.is_null() + ).fill_null(True) + # Assign session IDs based on cumulative sum of session boundaries + # cumsum - 1 to start session IDs at 0 + session_ids = session_boundaries.cum_sum() - 1 + rolling_session_id + # Update rolling_session_id for the next group + rolling_session_id = session_ids.max() + 1 + # Add the session IDs to the original dataframe + group_df_sorted = group_df_sorted.with_columns( + [session_ids.alias(session_id_column_)] + ) + groups_with_session_ids.append(group_df_sorted) + res = sbd.concat(*groups_with_session_ids, axis=0) + return res + + +class SessionEncoder(TransformerMixin, BaseEstimator): + """Add a session ID column to a dataframe based on time gaps and other columns. + + A session is defined as a sequence of events where consecutive events are separated + by at most ``session_gap`` seconds. Additionally, it is possible to provide a column + or list of columns that can be used to distinguish between sessions, such + as user identifiers (specified by the ``split_by`` column). + Sessions change when either the time gap between events exceeds ``session_gap``, + or the identifiers in ``split_by`` column(s) change. + + The encoder takes care of sorting the data by the timestamp and ``split_by`` columns + before identifying sessions, and sorting it back to the original order at the end, + so the original order of events in the input dataframe does not matter. + All unrelated columns are passed through unchanged. + + Parameters + ---------- + timestamp_col : str + The name of the column that identifies the time of an event. This column + is used to determine the start and end of a session. ``timestamp_col`` must + be a datetime, and will be rejected otherwise. + The dataframe is sorted by ``timestamp_col`` and ``split_by`` (if provided) + before identifying sessions, and sorted back to the original order at + the end, so the order of events in the input dataframe does not matter. + + split_by : optional[str, list[str]], default=None + The name of the column, or list of columns, to use to define sessions. + A session boundary is created when the value in any of these columns + changes, or when the time gap between events exceeds ``session_gap``. + The dataframe is sorted by ``split_by`` and ``timestamp_col`` before + identifying sessions, and sorted back to the original order at the end, + so the order of events in the input dataframe does not matter. + This is typically a user identifier column, but it can also be used to define + sessions by other groupings (e.g. user and device type). + If not provided, sessions are detected based on the time gap between events, + and all events are considered to belong to the same user (or group). + + session_gap : int, default=1800 + The maximum gap (in seconds) between events in a session. If the gap + between two events exceeds this value, they are considered to be in + different sessions. Default is 1800 seconds (30 minutes). + + suffix : str, default="session_id" + The suffix to be added to the name of the timestamp column. + + Attributes + ---------- + all_inputs_ : list of str + All column names in the input dataframe. + + all_outputs_: list of str + All column names in the input dataframe plus the new column that identifies + the session, with name "{timestamp}_{suffix}". + + session_id_column_ : str + The name of the session ID column that is added to the dataframe. This is + generated as "{timestamp_col}_{suffix}", but if this name already exists in + the input dataframe, a random suffix is added to avoid overwriting it. + + Examples + -------- + Consider this example where we have a dataframe with user events, and we want + to identify sessions based on a 30-minute gap between events for each user. + Users are identified by the value of the column ``user_id``. + Note that the order of the events in the input dataframe does not matter: + the ``SessionEncoder`` will sort the events by user and timestamp before + identifying sessions (and sort them back to the original order at the end). + + Sessions are defined by sorting over the ``split_by``columns (if provided) + and then by the timestamp. + + >>> import pandas as pd + >>> from datetime import datetime, timedelta + >>> data = { + ... "user_id": [1, 1, 1, 1, 1, 2, 2], + ... "device_id": [ + ... "mobile", + ... "mobile", + ... "desktop", + ... "desktop", + ... "mobile", + ... "mobile", + ... "mobile", + ... ], + ... "timestamp": [ + ... pd.Timestamp("2024-01-01 10:00:00"), + ... pd.Timestamp("2024-01-01 10:10:00"), # 10 min later, same session + ... pd.Timestamp("2024-01-01 10:05:00"), # Different device (sorted), + ... # different session + ... pd.Timestamp("2024-01-01 10:20:00"), # 15 min later, same session + ... # different session + ... pd.Timestamp("2024-01-01 11:20:00"), # 60 min later, new session + ... pd.Timestamp("2024-01-01 10:00:00"), # Different user + ... pd.Timestamp("2024-01-01 10:15:00"), # 15 min later, same session + ... ], + ... "action": ["view", "purchase", "view", "checkout", "view", "login", "view"], + ... } + >>> df = pd.DataFrame(data) + >>> print(df) + user_id device_id timestamp action + 0 1 mobile 2024-01-01 10:00:00 view + 1 1 mobile 2024-01-01 10:10:00 purchase + 2 1 desktop 2024-01-01 10:05:00 view + 3 1 desktop 2024-01-01 10:20:00 checkout + 4 1 mobile 2024-01-01 11:20:00 view + 5 2 mobile 2024-01-01 10:00:00 login + 6 2 mobile 2024-01-01 10:15:00 view + + We use the ``SessionEncoder`` with default ``session_gap`` of 30 minutes: + + >>> from skrub import SessionEncoder + >>> encoder = SessionEncoder( + ... split_by='user_id', timestamp_col='timestamp' + ... ) + >>> result = encoder.fit_transform(df) + >>> result + user_id device_id timestamp action timestamp_session_id + 0 1 mobile 2024-01-01 10:00:00 view 0 + 1 1 mobile 2024-01-01 10:10:00 purchase 0 + 2 1 desktop 2024-01-01 10:05:00 view 0 + 3 1 desktop 2024-01-01 10:20:00 checkout 0 + 4 1 mobile 2024-01-01 11:20:00 view 1 + 5 2 mobile 2024-01-01 10:00:00 login 2 + 6 2 mobile 2024-01-01 10:15:00 view 2 + + In this example, grouping by `user_id` results in three separate sessions: + + - User 1 has two sessions (session 0 and session 1) because there is a gap of + 60 minutes between their events at 10:20 and 11:20, which exceeds the 30-minute + threshold. The first four events of user 1 belong to session 0, while the + last event belongs to session 1. + - User 2 has one session (session 2) because their events are within the + 30-minute window. + + You can also identify users by multiple columns. For instance, the same user + on different devices should have separate sessions. + + >>> encoder_multi = SessionEncoder( + ... split_by=['user_id', 'device_id'], + ... timestamp_col='timestamp', + ... ) + >>> result_multi = encoder_multi.fit_transform(df) + >>> print(result_multi) + user_id device_id timestamp action timestamp_session_id + 0 1 mobile 2024-01-01 10:00:00 view 1 + 1 1 mobile 2024-01-01 10:10:00 purchase 1 + 2 1 desktop 2024-01-01 10:05:00 view 0 + 3 1 desktop 2024-01-01 10:20:00 checkout 0 + 4 1 mobile 2024-01-01 11:20:00 view 2 + 5 2 mobile 2024-01-01 10:00:00 login 3 + 6 2 mobile 2024-01-01 10:15:00 view 3 + + In this example: + + - User 1 on "desktop" has session 0. + - User 1 on "mobile" has two sessions, session 1 and session 2, because there + is a gap of 60 minutes between their events at 10:10 and 11:20, which exceeds + the 30-minute threshold. + - User 2 on "mobile" has session 3 (different user). + + Note again that sessions are defined by sorting over the ``split_by`` columns + and then by the timestamp: this is why the "desktop" session of User 1 is + session 0, even though it starts after the "mobile" session in the original + dataframe. + + You can also use ``SessionEncoder`` without a user identifier column. In this case, + sessions are separated only by time gaps. This is useful for analyzing a single + timeseries or events that don't have a user dimension: + + >>> encoder_no_split = SessionEncoder( + ... split_by=None, + ... timestamp_col='timestamp', + ... ) + >>> data_no_split = { + ... 'timestamp': [ + ... pd.Timestamp('2024-01-01 10:00:00'), + ... pd.Timestamp('2024-01-01 10:10:00'), # 10 min gap + ... pd.Timestamp('2024-01-01 10:15:00'), # 5 min gap, still in session + ... pd.Timestamp('2024-01-01 11:00:00'), # 45 min gap, new session + ... pd.Timestamp('2024-01-01 11:10:00'), # 10 min gap, continue session + ... ], + ... 'event_type': ['start', 'action', 'action', 'restart', 'action'] + ... } + >>> df_no_split = pd.DataFrame(data_no_split) + >>> result_no_split = encoder_no_split.fit_transform(df_no_split) + >>> result_no_split + timestamp event_type timestamp_session_id + 0 2024-01-01 10:00:00 start 0 + 1 2024-01-01 10:10:00 action 0 + 2 2024-01-01 10:15:00 action 0 + 3 2024-01-01 11:00:00 restart 1 + 4 2024-01-01 11:10:00 action 1 + + In this example: + + - Events at 10:00, 10:10, and 10:15 form session 0 (all gaps < 30 min). + - The event at 11:00 starts a new session 1 (45 min gap > 30 min). + - The event at 11:10 continues session 1 (10 min gap < 30 min). + + It is possible to change the duration of the session gap by setting the + ``session_gap`` parameter. For example, we can set it to 5 minutes (300 seconds) + instead of the default 30 minutes, and this will change the session assignments + accordingly: + + >>> encoder_new_gap = SessionEncoder( + ... split_by=None, + ... timestamp_col='timestamp', + ... session_gap=300 + ... ) + >>> result_new_gap = encoder_new_gap.fit_transform(df_no_split) + >>> result_new_gap + timestamp event_type timestamp_session_id + 0 2024-01-01 10:00:00 start 0 + 1 2024-01-01 10:10:00 action 1 + 2 2024-01-01 10:15:00 action 1 + 3 2024-01-01 11:00:00 restart 2 + 4 2024-01-01 11:10:00 action 3 + + It is also possible to change the suffix that is added at the end of the session + ID column via the "suffix" parameter. This is useful, for example, if you want + to add sessions based on different groupings or intervals: + + >>> data_multi = { + ... 'user_id': [1, 1, 1, 1, 2, 2], + ... 'device_id': ['mobile', 'mobile', 'desktop', 'desktop', 'mobile', 'mobile'], + ... 'timestamp': [ + ... pd.Timestamp('2024-01-01 10:00:00'), + ... pd.Timestamp('2024-01-01 10:10:00'), # 10 min later, same session + ... pd.Timestamp('2024-01-01 10:05:00'), # Different device (sorted), + ... # different session + ... pd.Timestamp('2024-01-01 10:20:00'), # 15 min later, same session + ... pd.Timestamp('2024-01-01 10:00:00'), # Different user + ... pd.Timestamp('2024-01-01 10:15:00'), # 15 min later, same session + ... ], + ... 'action': ['view', 'purchase', 'view', 'checkout', 'login', 'view'] + ... } + >>> df = pd.DataFrame(data_multi) + >>> encoder_user = SessionEncoder("timestamp", + ... split_by=["user_id"], suffix="user") + >>> encoder_user.fit_transform(df) + user_id device_id timestamp action timestamp_user + 0 1 mobile 2024-01-01 10:00:00 view 0 + 1 1 mobile 2024-01-01 10:10:00 purchase 0 + 2 1 desktop 2024-01-01 10:05:00 view 0 + 3 1 desktop 2024-01-01 10:20:00 checkout 0 + 4 2 mobile 2024-01-01 10:00:00 login 1 + 5 2 mobile 2024-01-01 10:15:00 view 1 + + >>> encoder_user_device = SessionEncoder("timestamp", + ... split_by=["user_id", "device_id"], + ... suffix="user_device") + >>> encoder_user_device.fit_transform(df) + user_id device_id timestamp action timestamp_user_device + 0 1 mobile 2024-01-01 10:00:00 view 1 + 1 1 mobile 2024-01-01 10:10:00 purchase 1 + 2 1 desktop 2024-01-01 10:05:00 view 0 + 3 1 desktop 2024-01-01 10:20:00 checkout 0 + 4 2 mobile 2024-01-01 10:00:00 login 2 + 5 2 mobile 2024-01-01 10:15:00 view 2 + + """ + + def __init__( + self, timestamp_col, split_by=None, session_gap=30 * 60, suffix="session_id" + ): + self.timestamp_col = timestamp_col + self.split_by = split_by + self.session_gap = session_gap + self.suffix = suffix + + def fit(self, X, y=None): + """Fit the transformer to the data. + + Parameters + ---------- + X : pandas.DataFrame or polars.DataFrame + The input dataframe. + + y : None + Ignored. + + Returns + ------- + self : SessionEncoder + The fitted transformer. + """ + self.fit_transform(X, y) + return self + + def fit_transform(self, X, y=None): + """Fit the transformer to the data and return the transformed dataframe. + + Parameters + ---------- + X : pandas.DataFrame or polars.DataFrame + The input dataframe. + + y : None + Ignored. + + Returns + ------- + pandas.DataFrame or polars.DataFrame + The transformed dataframe with session information. + """ + self.all_inputs_ = sbd.column_names(X) + + # Checking that all the needed columns are there + self._check_input_dataframe(X) + + self.session_id_column_ = f"{self.timestamp_col}_{self.suffix}" + + # If the generated session id column name already exists in the input dataframe, + # we add a random suffix to avoid overwriting it + if self.session_id_column_ in self.all_inputs_: + self.session_id_column_ += f"_skrub_{random_string()}" + + X_result = self.transform(X, y) + + return X_result + + def transform(self, X, y=None): + """Transform the data by encoding sessions. + + Parameters + ---------- + X : pandas.DataFrame or polars.DataFrame + The input dataframe. + + y : None + Ignored. + + Returns + ------- + pandas.DataFrame or polars.DataFrame + The transformed dataframe with session information. + """ + check_is_fitted(self) + + # if the input dataframe is empty, we can skip all the processing and + # return an empty dataframe with the session_id column added + if sbd.is_empty_frame(X): + X = sbd.with_columns( + X, **{self.session_id_column_: np.array([], dtype=np.int64)} + ) + return X + + # Adding a row order column to sort lines back + row_order_col = f"_row_order_skrub_{random_string()}" + X_with_order = sbd.with_columns(X, **{row_order_col: range(X.shape[0])}) + + # sort the input dataframe by the "split_by" and "timestamp" columns + # _split_by_columns can be empty if self.split_by is None + sort_by = self._split_by_columns + [self.timestamp_col] + + # Selecting only the columns needed for sessionization and sorting them + # to ensure that the sessionization is done correctly + X_selected = s.select(X_with_order, sort_by + [row_order_col]) + X_with_session_id = _add_session_column( + X_selected, + self._split_by_columns, + self.timestamp_col, + self.session_gap, + self.session_id_column_, + ) + # Reordering rows back to the original order + X_result = sbd.sort(X_with_session_id, by=row_order_col) + + # Concatenating the session_id column to the original dataframe, so that + # all unrelated columns are passed through unchanged. + # Doing this has the added benefit of adding the session_id column at the + # end of the dataframe. + X_result = sbd.concat(X, s.select(X_result, self.session_id_column_), axis=1) + + self.all_outputs_ = sbd.column_names(X_result) + return X_result + + def _check_input_dataframe(self, X): + """ + Check that the input columns are present and correct + """ + # check the correctness of the values of session_gap + if not isinstance(self.session_gap, numbers.Number): + raise TypeError(f"Expected a number, got {type(self.session_gap)}") + if self.session_gap <= 0: + raise ValueError( + f"session_gap must be a positive number, got {self.session_gap}" + ) + # check that the suffix is a string + if not isinstance(self.suffix, str) or self.suffix is None: + raise ValueError(f"Expected a string as suffix, got {self.suffix!r}") + + # Check that the timestamp column is present + if self.timestamp_col not in self.all_inputs_: + raise ValueError( + f"Column '{self.timestamp_col}' not found in input dataframe" + ) + # check that the timestamp column is of datetime type + if not sbd.is_empty_frame(X) and not sbd.is_any_date( + sbd.col(X, self.timestamp_col) + ): + raise TypeError( + "Expected a datetime column for timestamp_col," + f" got {self.timestamp_col!r}" + ) + + # check that the required columns are present in the input dataframe + if self.split_by is None: + self._split_by_columns = [] + return + if isinstance(self.split_by, str): + self._split_by_columns = [self.split_by] + elif isinstance(self.split_by, Iterable): + self._split_by_columns = list(self.split_by) + else: + raise TypeError("split_by must be a string, a list of strings, or None") + for col in self._split_by_columns: + if col not in self.all_inputs_: + raise ValueError(f"Column '{col}' not found in input dataframe") + + def get_feature_names_out(self, input_features=None): + """Return the column names of the output of ``transform`` as a list of strings. + + Parameters + ---------- + input_features : array-like of str or None, default=None + Ignored. + + Returns + ------- + list of strings + The column names. + """ + check_is_fitted(self) + return self.all_outputs_ diff --git a/skrub/datasets/__init__.py b/skrub/datasets/__init__.py index a00b9bb8f..e6130f07d 100644 --- a/skrub/datasets/__init__.py +++ b/skrub/datasets/__init__.py @@ -14,7 +14,13 @@ fetch_traffic_violations, fetch_videogame_sales, ) -from ._generating import make_deduplication_data, toy_cities, toy_orders, toy_products +from ._generating import ( + make_deduplication_data, + make_retail_events, + toy_cities, + toy_orders, + toy_products, +) from ._utils import get_data_dir __all__ = [ @@ -34,6 +40,7 @@ "fetch_videogame_sales", "get_data_dir", "make_deduplication_data", + "make_retail_events", "toy_orders", "toy_products", "toy_cities", diff --git a/skrub/datasets/_generating.py b/skrub/datasets/_generating.py index 87e0aec78..e85c5efc9 100644 --- a/skrub/datasets/_generating.py +++ b/skrub/datasets/_generating.py @@ -305,3 +305,245 @@ def toy_cities(seed=0, size=1000, nulls=0.1, n_metrics=4): df = pd.concat((df_cities, df_dates, df_metrics), axis=1) return df + + +def make_retail_events(n_users=200, n_events=5000, random_state=None): + """Generate a synthetic e-commerce clickstream dataset for classification. + + Each row represents one user interaction event on a retail platform. + The dataset is designed to showcase :class:`~skrub.SessionEncoder` (which + groups events into sessions using ``user_id`` and ``timestamp``), + :class:`~skrub.DatetimeEncoder` (which extracts hour-of-day, day-of-week, + etc. from ``timestamp``), one-hot encoding of categorical features, and + scaling of numerical features. + + The binary target ``converted`` indicates whether a purchase occurred + during the session that contains this event. All events belonging to the + same session share the same ``converted`` value (a session either converts + or it does not). The probability of conversion is determined at the + session level by the most intent-rich event type in the session + (``add_to_cart`` > ``wishlist`` > ``search`` > ``page_view``), the + dominant device, and the mean price viewed — so the signal is learnable + directly from the observable features. + + Parameters + ---------- + n_users : int, default=200 + Number of distinct users in the dataset. + + n_events : int, default=5000 + Approximate total number of events (rows) to generate. The actual + count may differ slightly because session sizes are drawn from a + Poisson distribution. + + random_state : int or RandomState instance, optional + Controls the random number generation for reproducibility. + + Returns + ------- + bunch : :class:`~sklearn.utils.Bunch` + A dictionary-like object with the following attributes: + + - ``X`` : :class:`~pandas.DataFrame` with columns: + + - ``user_id`` : str — user identifier. + - ``timestamp`` : :class:`~pandas.Timestamp` — event time. + - ``device_type`` : str — one of ``"mobile"``, ``"desktop"``, + ``"tablet"``. + - ``page_category`` : str — one of ``"electronics"``, ``"fashion"``, + ``"home"``, ``"sports"``, ``"books"``. + - ``event_type`` : str — one of ``"page_view"``, ``"search"``, + ``"add_to_cart"``, ``"wishlist"``. + - ``time_on_page`` : float — seconds spent on the page (exponential + distribution, mean ~ 120 s). + - ``price_viewed`` : float — price of the item viewed (log-normal). + + - ``y`` : :class:`~pandas.Series` of bool, name ``"converted"`` — the + classification target. + + Examples + -------- + >>> from skrub.datasets import make_retail_events + >>> bunch = make_retail_events(n_users=20, n_events=100, random_state=0) + >>> bunch.X.shape[1] # 7 feature columns; rows ~ n_events + 7 + >>> bunch.X.columns.tolist() + ['user_id', 'timestamp', 'device_type', 'page_category', 'event_type', 'time_on_page', 'price_viewed'] + >>> bunch.y.name + 'converted' + >>> bunch.y.dtype + dtype('bool') + """ # noqa: E501 + rng = check_random_state(random_state) + + # --- users ----------------------------------------------------------- + user_ids = [f"user_{i:04d}" for i in range(n_users)] + + # Distribute events across users with a power-law (Pareto) weight so that + # a small number of users generate the bulk of the activity. + activity_weights = rng.pareto(2.0, size=n_users) + 1.0 + activity_weights /= activity_weights.sum() + # The multinomial draw gives us the number of events per user, summing to n_events. + events_per_user = rng.multinomial(n_events, activity_weights) + + # --- timestamps with realistic session structure --------------------- + # Events form *sessions*: bursts of activity where consecutive events are + # only ~90 s apart, separated by long idle gaps (at least 2 h). + # + # For each user: + # 1. Split their event budget into sessions of Poisson(3)+1 events. + # 2. Space session starts by Exponential gaps >> session_gap, spread + # across a 90-day window. + # 3. Within each session, place events with Exponential(90 s) gaps. + base_time = datetime(2024, 1, 1, 0, 0, tzinfo=timezone.utc) + total_window_s = 90 * 24 * 3600 # 90 days + within_session_mean_s = 90.0 # ~1.5 min between events inside a session + min_between_session_s = 2 * 3600 # 2 h minimum gap — well above session_gap + + all_user_ids: list = [] + all_timestamps: list = [] + all_session_keys: list = [] # unique key per (user, session) pair + + for uid, n_user_events in zip(user_ids, events_per_user): + if n_user_events == 0: + continue + + # Split into sessions; each session has at least 1 event. + # The size of each session (number of events) is drawn from a Poisson + # distribution with mean 3, plus 1 to ensure at least one event per session. + session_sizes = [] + remaining = int(n_user_events) + while remaining > 0: + size = min(int(rng.poisson(3)) + 1, remaining) + session_sizes.append(size) + remaining -= size + + # Number of sessions per user + n_sessions = len(session_sizes) + + # Session start times: inter-session gaps drawn from Exponential so + # that they are spread over the 90-day window but always exceed the + # minimum between-session gap. + mean_gap_s = max(total_window_s / n_sessions, min_between_session_s) + inter_gaps = rng.exponential(scale=mean_gap_s, size=n_sessions) + # Random offset for the very first session start. + inter_gaps[0] += rng.uniform(0, min_between_session_s) + session_starts_s = np.cumsum(inter_gaps) + + for sess_idx, (start_s, sess_size) in enumerate( + zip(session_starts_s, session_sizes) + ): + # Events are placed at start_s, start_s+gap1, start_s+gap1+gap2 … + within_gaps = np.concatenate( + [[0.0], rng.exponential(within_session_mean_s, size=sess_size - 1)] + ) + session_key = f"{uid}_{sess_idx}" + for offset_s in start_s + np.cumsum(within_gaps): + all_user_ids.append(uid) + all_timestamps.append(base_time + pd.Timedelta(seconds=float(offset_s))) + all_session_keys.append(session_key) + + n_actual = len(all_user_ids) + + # --- categorical features -------------------------------------------- + device_type = rng.choice( + ["mobile", "desktop", "tablet"], + size=n_actual, + p=[0.55, 0.35, 0.10], + ) + page_category = rng.choice( + ["electronics", "fashion", "home", "sports", "books"], + size=n_actual, + ) + event_type = rng.choice( + ["page_view", "search", "add_to_cart", "wishlist"], + size=n_actual, + p=[0.60, 0.20, 0.15, 0.05], + ) + + # --- numerical features ---------------------------------------------- + # time_on_page: seconds spent on page + time_on_page = rng.exponential(scale=120.0, size=n_actual).round(1) + # price_viewed: item price in USD (log-normal, median ~ e^3.5 ~ 33) + price_viewed = np.exp(rng.normal(loc=3.5, scale=1.2, size=n_actual)).round(2) + + # --- assemble & sort ------------------------------------------------- + X = pd.DataFrame( + { + "user_id": all_user_ids, + "timestamp": all_timestamps, + "_session_key": all_session_keys, + "device_type": device_type, + "page_category": page_category, + "event_type": event_type, + "time_on_page": time_on_page, + "price_viewed": price_viewed, + } + ) + # Sorting by timestamp + X = X.sort_values(["timestamp"]).reset_index(drop=True) + + # --- target: converted (bool), assigned per session ------------------ + # A session either converts or it does not — all events in a session + # share the same label. + # + # The conversion probability is a logistic function of session-level + # summaries of observable features, so the model can learn it: + # + # best_event : the most purchase-intent event type in the session + # (add_to_cart >> wishlist >> search >> page_view) + # device : dominant device (desktop > tablet > mobile) + # price : mean price viewed (expensive items convert less) + + # First, add the weights corresponding to each feature to the event-level dataframe. + event_intent = X["event_type"].map( + {"add_to_cart": 2.0, "wishlist": 0.8, "search": 0.0, "page_view": -0.5} + ) + device_score_col = X["device_type"].map( + {"desktop": 0.5, "tablet": 0.1, "mobile": -0.3} + ) + price_score_col = -0.2 * np.log1p(X["price_viewed"]) + + # Then, select only the session key and the relevant features + tmp = X[["_session_key"]].assign( + event_intent=event_intent, + device_score=device_score_col, + price_score=price_score_col, + ) + # Now aggregate at the session level. + # - For event_intent, we take the max to get the most purchase-intent event in + # the session. + # - For device_score and price_score, we take the mean across events in the session + # Then, to get a single session-level logit, we sum the three aggregated features. + session_logits = ( + tmp.groupby("_session_key") + .agg( + event_intent=("event_intent", "max"), + device_score=("device_score", "mean"), + price_score=("price_score", "mean"), + ) + .sum(axis=1) + ) + + # Add one noise draw per session (not per event) so the label is + # consistent within a session. + unique_keys = session_logits.index.tolist() + noise = dict(zip(unique_keys, rng.normal(0.0, 0.5, size=len(unique_keys)))) + # add the noise to the session logits, then apply the logistic function to get a + # conversion probability per session, then draw the binary label from a Bernoulli. + # For every session key k, session_prob[k] is the conversion probability of + # that session, and session_converted[k] is the binary label indicating whether + # that session converted or not. + session_prob = { + k: 1.0 / (1.0 + np.exp(-(session_logits[k] + noise[k]))) for k in unique_keys + } + session_converted = {k: bool(rng.binomial(1, session_prob[k])) for k in unique_keys} + + # Finally, get the label for each event by mapping its session key to the + # session_converted dict. This is our y + y = X["_session_key"].map(session_converted).rename("converted").astype(bool) + # Drop the session key (the SessionEncoder should be able to recover it from + # user_id and timestamp) + X = X.drop(columns=["_session_key"]) + + return Bunch(X=X, y=y) diff --git a/skrub/tests/test_session_encoder.py b/skrub/tests/test_session_encoder.py new file mode 100644 index 000000000..f92a27eb0 --- /dev/null +++ b/skrub/tests/test_session_encoder.py @@ -0,0 +1,762 @@ +import datetime + +import numpy as np +import pandas as pd +import pytest +from packaging.version import parse + +from skrub import SessionEncoder +from skrub import _dataframe as sbd +from skrub._session_encoder import ( + _add_session_column, +) + + +@pytest.fixture +def example_session_data(df_module): + """Create example session data with multiple users and sessions.""" + timestamps = [] + user_ids = [] + usernames = [] + + base_time = datetime.datetime(2024, 1, 1) + + # User 101, alice: 3 sessions with 5 events each, 10 days apart + for session in range(3): + session_start = base_time + datetime.timedelta(days=session * 10) + for event in range(5): + timestamps.append(session_start + datetime.timedelta(minutes=event * 2)) + user_ids.append(101) + usernames.append("alice") + + # User 102, bob: 2 sessions with 3 events each, 2 hours apart + for session in range(2): + session_start = base_time + datetime.timedelta(days=35, hours=session * 2) + for event in range(3): + timestamps.append(session_start + datetime.timedelta(minutes=event * 5)) + user_ids.append(102) + usernames.append("bob") + + # User 103, charlie: 1 session with 4 events + session_start = base_time + datetime.timedelta(days=40) + for event in range(4): + timestamps.append(session_start + datetime.timedelta(minutes=event * 3)) + user_ids.append(103) + usernames.append("charlie") + + return df_module.make_dataframe( + { + "timestamp": timestamps, + "user_id": user_ids, + "username": usernames, + } + ) + + +@pytest.fixture +def example_session_data_multi_by(df_module): + """Create example session data where a user is identified by two columns. + + A user is uniquely identified by the combination of ``user_id`` and + ``device_id``. The same ``user_id`` on two different devices produces + independent sessions, which lets us verify that ``split_by`` accepts a list of + column names. + """ + timestamps = [] + user_ids = [] + device_ids = [] + + base_time = datetime.datetime(2024, 1, 1) + + # user 1, device "mobile": 2 sessions, 10 days apart, 4 events each + for session in range(2): + session_start = base_time + datetime.timedelta(days=session * 10) + for event in range(4): + timestamps.append(session_start + datetime.timedelta(minutes=event * 3)) + user_ids.append(1) + device_ids.append("mobile") + + # user 1, device "desktop": 1 session, 3 events + # (same user_id as above but different device → separate sessions) + session_start = base_time + datetime.timedelta(days=5) + for event in range(3): + timestamps.append(session_start + datetime.timedelta(minutes=event * 4)) + user_ids.append(1) + device_ids.append("desktop") + + # user 2, device "mobile": 1 session, 5 events + session_start = base_time + datetime.timedelta(days=20) + for event in range(5): + timestamps.append(session_start + datetime.timedelta(minutes=event * 2)) + user_ids.append(2) + device_ids.append("mobile") + + return df_module.make_dataframe( + { + "timestamp": timestamps, + "user_id": user_ids, + "device_id": device_ids, + } + ) + + +@pytest.mark.parametrize( + "by_column,expected_sessions,split_key_to_sessions", + [ + ("user_id", 6, {101: 3, 102: 2, 103: 1}), + ("username", 6, {"alice": 3, "bob": 2, "charlie": 1}), + ], +) +def test_basic_functionality( + example_session_data, by_column, expected_sessions, split_key_to_sessions +): + """Test basic sessionization grouping by user_id or username.""" + # Apply SessionEncoder grouping by the specified column + se = SessionEncoder( + split_by=by_column, timestamp_col="timestamp", session_gap=30 * 60 + ) + result = se.fit_transform(example_session_data) + + # Check that we have the expected total number of sessions + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + unique_sessions = set(session_ids) + assert len(unique_sessions) == expected_sessions + + # content of the "session_id" column after sessionization + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + # content of the "by" column (user_id or username) + group_values = sbd.to_list(sbd.col(result, by_column)) + + counted_sessions = {} + for group_key, session_id in zip(group_values, session_ids): + if group_key not in counted_sessions: + counted_sessions[group_key] = set() + counted_sessions[group_key].add(session_id) + for group_key, sessions in counted_sessions.items(): + assert len(sessions) == split_key_to_sessions[group_key] + + # Checking that fit then transform still works + result_fit = se.fit(example_session_data).transform(example_session_data) + # content of the "session_id" column after sessionization + session_ids_fit = sbd.to_list(sbd.col(result_fit, "timestamp_session_id")) + + assert session_ids == session_ids_fit + + +@pytest.mark.parametrize( + "by_column,group_keys", + [ + ("user_id", [101, 102, 103]), + ("username", ["alice", "bob", "charlie"]), + ], +) +def test_different_users_different_sessions( + example_session_data, by_column, group_keys +): + """Test that different users/groups have different session IDs.""" + # Apply SessionEncoder + se = SessionEncoder( + split_by=by_column, timestamp_col="timestamp", session_gap=30 * 60 + ) + result = se.fit_transform(example_session_data) + + # content of the "session_id" column after sessionization + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + # content of the "by" column (user_id or username) + group_values = sbd.to_list(sbd.col(result, by_column)) + + # Verify different groups don't share session IDs + for i, key1 in enumerate(group_keys): + for key2 in group_keys[i + 1 :]: + # find the indices of events for each group key (user id or username) + indices1 = [idx for idx, v in enumerate(group_values) if v == key1] + indices2 = [idx for idx, v in enumerate(group_values) if v == key2] + # find the unique session IDs for each group key (each user) + sessions1 = {session_ids[idx] for idx in indices1} + sessions2 = {session_ids[idx] for idx in indices2} + + # check that there are no shared session IDs between different users/groups + assert len(sessions1.intersection(sessions2)) == 0 + + +def test_multi_by_columns(example_session_data_multi_by): + """Test sessionization when a user is identified by a combination of columns. + + The fixture has user_id=1 on two devices ("mobile" and "desktop"). When + ``group_by=["user_id", "device_id"]``, those two device contexts must be treated + as independent groups, producing separate session IDs even though they share + the same ``user_id``. + + Expected sessions: + - (user_id=1, device_id="mobile") → 2 sessions + - (user_id=1, device_id="desktop") → 1 session + - (user_id=2, device_id="mobile") → 1 session + Total: 4 sessions + """ + se = SessionEncoder( + split_by=["user_id", "device_id"], + timestamp_col="timestamp", + session_gap=30 * 60, + ) + result = se.fit_transform(example_session_data_multi_by) + + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + user_ids = sbd.to_list(sbd.col(result, "user_id")) + device_ids = sbd.to_list(sbd.col(result, "device_id")) + + # 4 distinct sessions overall + assert len(set(session_ids)) == 4 + + # create a dict that groups sessions by (user_id, device_id) pair + group_sessions: dict = {} + for uid, did, sid in zip(user_ids, device_ids, session_ids): + key = (uid, did) + # Each (user_id, device_id) pair should have its own set of session IDs + # We use a set to track unique session IDs for each group key + group_sessions.setdefault(key, set()).add(sid) + + # assert that each (user_id, device_id) pair has the expected number of sessions + assert len(group_sessions[(1, "mobile")]) == 2 + assert len(group_sessions[(1, "desktop")]) == 1 + assert len(group_sessions[(2, "mobile")]) == 1 + + # sessions belonging to different (user_id, device_id) pairs must be disjoint + keys = list(group_sessions) + # go through each pair of group keys (user_id, device_id) + for i, k1 in enumerate(keys): + for k2 in keys[i + 1 :]: + # check that the sets in group_sessions for different keys are disjoint + assert group_sessions[k1].isdisjoint(group_sessions[k2]) + + +def test_multiple_users(df_module): + """Test sessionization with multiple users interleaved.""" + timestamps = [] + user_ids = [] + + base_time = datetime.datetime(2024, 1, 1) + + # Create events for two users, alternating + for i in range(10): + timestamps.append(base_time + datetime.timedelta(minutes=i)) + user_ids.append(101 if i % 2 == 0 else 102) + + df = df_module.make_dataframe( + { + "timestamp": timestamps, + "user_id": user_ids, + } + ) + + se = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap=30 * 60 + ) + result = se.fit_transform(df) + + # After sorting by user_id and timestamp, each user should have 1 session + # since all their events are within 30 minutes + session_ids = sbd.col(result, "timestamp_session_id") + + # The encoder sorts by user_id then timestamp, so events are grouped by user + # Check that there are exactly 2 sessions (one per user) + assert len(set(session_ids)) == 2 + + +def test_time_gap_threshold(df_module): + """Test that session_gap parameter correctly determines sessionization.""" + timestamps = [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 15), # 15 min gap + datetime.datetime(2024, 1, 1, 10, 50), # 35 min gap + datetime.datetime(2024, 1, 1, 11, 0), # 10 min gap + ] + user_ids = [101, 101, 101, 101] + + df = df_module.make_dataframe( + { + "timestamp": timestamps, + "user_id": user_ids, + } + ) + + # With 20-minute gap: should create 2 sessions (split at 35-min gap) + se_20 = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap=20 * 60 + ) + result_20 = se_20.fit_transform(df) + session_ids_20 = sbd.to_list(sbd.col(result_20, "timestamp_session_id")) + assert len(set(session_ids_20)) == 2 + + # With 40-minute gap: should create 1 session (all gaps < 40 min) + se_40 = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap=40 * 60 + ) + result_40 = se_40.fit_transform(df) + session_ids_40 = sbd.to_list(sbd.col(result_40, "timestamp_session_id")) + assert len(set(session_ids_40)) == 1 + + +def test_no_user_column(df_module): + """Test sessionization without a user identifier column. + + When ``split_by`` is None, all events are treated as from the same "user", and + sessions are separated only by time gaps. + """ + timestamps = [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 10), # 10 min gap + datetime.datetime(2024, 1, 1, 10, 15), # 5 min gap (within 30 min) + datetime.datetime(2024, 1, 1, 11, 0), # 45 min gap (exceeds 30 min) + datetime.datetime(2024, 1, 1, 11, 10), # 10 min gap (within 30 min) + ] + + df = df_module.make_dataframe( + { + "timestamp": timestamps, + } + ) + + # Without 'group_by', sessions are separated only by time gaps + se = SessionEncoder(split_by=None, timestamp_col="timestamp", session_gap=30 * 60) + result = se.fit_transform(df) + + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + # Expected: 2 sessions (events 0-2 in session 0, event 3 starts new session) + # Then event 4 continues session 1 + assert len(set(session_ids)) == 2 + assert ( + session_ids[0] == session_ids[1] == session_ids[2] + ) # First 3 events in session 0 + assert session_ids[3] == session_ids[4] # Last 2 events in session 1 + assert session_ids[0] != session_ids[3] # Sessions are different + + +def test_single_event(df_module): + """Test sessionization with single event per user.""" + df = df_module.make_dataframe( + { + "timestamp": [datetime.datetime(2024, 1, 1, 10, 0)], + "user_id": [101], + } + ) + + se = SessionEncoder(split_by="user_id", timestamp_col="timestamp", session_gap=30) + result = se.fit_transform(df) + + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + assert len(session_ids) == 1 + # Single event should create one session + assert session_ids[0] == 0 + + +@pytest.mark.parametrize( + "group_by_param,timestamp_col_param,expected_error_type,expected_error_match", + [ + ( + "wrong_column", + "timestamp", + ValueError, + "Column 'wrong_column' not found", + ), + ( + "user_id", + "wrong_column", + ValueError, + "Column 'wrong_column' not found", + ), + ( + ["wrong_column", "user_device"], + "timestamp", + ValueError, + "Column 'wrong_column' not found", + ), + ( + 23, # invalid type for 'group_by' + "timestamp", + TypeError, + "split_by must be a string, a list of strings, or None", + ), + ], +) +def test_missing_column_error( + df_module, + group_by_param, + timestamp_col_param, + expected_error_type, + expected_error_match, +): + """Test that missing columns and invalid parameters raise appropriate errors.""" + df = df_module.make_dataframe( + { + "timestamp": [datetime.datetime(2024, 1, 1)], + "user_id": [101], + "user_device": ["mobile"], + } + ) + + se = SessionEncoder( + split_by=group_by_param, + timestamp_col=timestamp_col_param, + ) + with pytest.raises(expected_error_type, match=expected_error_match): + se.fit_transform(df) + + +def test_invalid_parameters(df_module): + """Test that invalid parameters raise appropriate errors.""" + df = df_module.make_dataframe( + { + "timestamp": [datetime.datetime(2024, 1, 1)], + "user_id": [101], + } + ) + + # Test non-numeric session_gap + se_non_numeric = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap="thirty" + ) + with pytest.raises(TypeError, match="Expected a number"): + se_non_numeric.fit_transform(df) + + # Test negative session_gap + se_negative = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap=-10 + ) + with pytest.raises(ValueError, match="session_gap must be a positive number"): + se_negative.fit_transform(df) + + # Test zero session_gap + se_zero = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap=0 + ) + with pytest.raises(ValueError, match="session_gap must be a positive number"): + se_zero.fit_transform(df) + + # Test invalid suffix (None) + se_invalid_suffix = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", suffix=None + ) + with pytest.raises(ValueError, match="Expected a string as suffix"): + se_invalid_suffix.fit_transform(df) + + # Test timestamp column with non-datetime type + df_invalid_timestamp = df_module.make_dataframe( + { + "timestamp": ["2024-01-01 10:00:00"], # string instead of datetime + "user_id": [101], + } + ) + se_invalid_timestamp = SessionEncoder( + split_by="user_id", timestamp_col="timestamp", session_gap=30 + ) + with pytest.raises(TypeError, match="Expected a datetime column for timestamp_col"): + se_invalid_timestamp.fit_transform(df_invalid_timestamp) + + +def test_preserves_columns(df_module): + """Test that original columns are preserved in output.""" + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 5), + ], + "user_id": [101, 101], + "extra_col": [1.5, 2.5], + } + ) + + se = SessionEncoder(split_by="user_id", timestamp_col="timestamp", session_gap=30) + result = se.fit_transform(df) + + result_cols = sbd.column_names(result) + assert "timestamp" in result_cols + assert "user_id" in result_cols + assert "extra_col" in result_cols + assert "timestamp_session_id" in result_cols + + +def test_fit_and_transform(df_module): + """Test that fit() and transform() work separately.""" + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 5), + ], + "user_id": [101, 101], + } + ) + + se = SessionEncoder(split_by="user_id", timestamp_col="timestamp", session_gap=30) + + # Test fit returns self + se_fitted = se.fit(df) + assert se_fitted is se + + # Test that all_inputs_ is set after fit + assert hasattr(se, "all_inputs_") + + +def test_get_feature_names(df_module): + """Test that get_feature_names returns the correct list of columns.""" + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 5), + ], + "user_id": [101, 101], + } + ) + + se = SessionEncoder(split_by="user_id", timestamp_col="timestamp", session_gap=30) + se.fit(df) + feature_names = se.get_feature_names_out() + + # Should include original columns plus "timestamp_session_id" + assert set(feature_names) == {"timestamp", "user_id", "timestamp_session_id"} + + +def test_check_is_new_session_no_by(df_module): + """_check_is_new_session with an empty group_by-list uses only the time gap.""" + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 10), # 10 min — within gap + datetime.datetime(2024, 1, 1, 11, 0), # 50 min — exceeds gap + datetime.datetime(2024, 1, 1, 11, 5), # 5 min — within gap + ] + } + ) + session_id = sbd.to_list( + sbd.col( + _add_session_column( + df, [], "timestamp", 30 * 60, session_id_column_="timestamp_session_id" + ), + "timestamp_session_id", + ) + ) + # Expected: first two events in session 0, last two events in session 1 + assert session_id == [0, 0, 1, 1] + + +@pytest.mark.skipif(parse(pd.__version__).major >= 3, reason="Test only for pandas < 3") +def test_add_session_column_old_pandas(df_module): + """Old versions of pandas have a different branch that needs to be covered""" + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 10), # 10 min — within gap + datetime.datetime(2024, 1, 1, 11, 0), # 50 min — exceeds gap + datetime.datetime(2024, 1, 1, 11, 5), # 5 min — within gap + ] + } + ) + session_id = sbd.to_list( + sbd.col( + _add_session_column( + df, [], "timestamp", 30 * 60, session_id_column_="timestamp_session_id" + ), + "timestamp_session_id", + ) + ) + # Expected: first two events in session 0, last two events in session 1 + assert session_id == [0, 0, 1, 1] + + +def test_check_is_new_session_with_by(df_module): + """_add_session_column returns a dataframe with a ``timestamp_session_id`` + column when a group_by-list is provided. A new session starts when the group key + changes (even for a tiny time gap) or when the time gap exceeds + ``session_gap``. + + Data layout (already sorted by user_id, timestamp): + row 0: user 1, 10:00 – first row, session 0 + row 1: user 1, 10:05 – same user, 5 min gap → still session 0 + row 2: user 2, 10:06 – user changed, 1 min gap → new session 1 + row 3: user 2, 10:10 – same user, 4 min gap → still session 1 + Expected session_ids: [0, 0, 1, 1] + """ + df = df_module.make_dataframe( + { + "user_id": [1, 1, 2, 2], + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 5), # same user, 5 min gap + datetime.datetime(2024, 1, 1, 10, 6), # different user, 1 min gap + datetime.datetime(2024, 1, 1, 10, 10), # same user, 4 min gap + ], + } + ) + result = _add_session_column( + df, ["user_id"], "timestamp", 30 * 60, "timestamp_session_id" + ) + + # _add_session_column now returns the full dataframe with session_id added + assert "timestamp_session_id" in sbd.column_names(result) + session_ids = sbd.to_list(sbd.col(result, "timestamp_session_id")) + assert session_ids == [0, 0, 1, 1] + + +@pytest.mark.parametrize( + "timestamp", + ["timestamp", "something_else"], +) +@pytest.mark.parametrize( + "suffix", + [None, "session_id", "test_suffix"], +) +def test_proper_suffix(timestamp, suffix, df_module): + df = df_module.make_dataframe( + { + "user_id": [1, 1, 2, 2], + timestamp: [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 5), # same user, 5 min gap + datetime.datetime(2024, 1, 1, 10, 6), # different user, 1 min gap + datetime.datetime(2024, 1, 1, 10, 10), # same user, 4 min gap + ], + } + ) + if suffix is None: + with pytest.raises(ValueError, match="Expected a string as suffix*"): + SessionEncoder( + timestamp_col=timestamp, split_by="user_id", suffix=suffix + ).fit_transform(df) + else: + result = SessionEncoder( + timestamp_col=timestamp, split_by="user_id", suffix=suffix + ).fit_transform(df) + # _add_session_column now returns the full dataframe with session_id added + expected_name = f"{timestamp}_{suffix}" + assert expected_name in sbd.column_names(result) + + +def test_preserves_input_order(df_module): + """Test that the output rows are in the same order as the input rows. + + The encoder sorts internally to detect sessions correctly, but the result + must be returned in the original input order. + """ + # Deliberately unsorted: bob first, then alice, timestamps out of order + timestamps = [ + datetime.datetime(2024, 1, 1, 10, 20), # row 0: bob + datetime.datetime(2024, 1, 1, 10, 0), # row 1: alice + datetime.datetime(2024, 1, 1, 10, 25), # row 2: bob + datetime.datetime(2024, 1, 1, 10, 5), # row 3: alice + ] + user_ids = ["bob", "alice", "bob", "alice"] + + df = df_module.make_dataframe({"timestamp": timestamps, "user_id": user_ids}) + + se = SessionEncoder(split_by="user_id", timestamp_col="timestamp", session_gap=30) + result = se.fit_transform(df) + + # The user_id column must still be in the original order + assert sbd.to_list(sbd.col(result, "user_id")) == user_ids + # The timestamp column must still be in the original order + assert sbd.to_list(sbd.col(result, "timestamp")) == timestamps + + +def test_error_dispatch(): + with pytest.raises(TypeError, match="Expecting a Pandas or Polars Dataframe"): + _add_session_column( + np.array([1]), + split_by_columns=[], + timestamp_col="timestamp", + session_gap=30, + session_id_column_="timestamp_session_id", + ) + + +def test_empty_dataframe(df_module): + """Test sessionization with empty dataframe.""" + df = df_module.make_dataframe( + { + "timestamp": [], + "user_id": [], + } + ) + + se = SessionEncoder(split_by="user_id", timestamp_col="timestamp", session_gap=30) + result = se.fit_transform(df) + + assert sbd.shape(result)[0] == 0 + assert "timestamp_session_id" in sbd.column_names(result) + assert sbd.column_names(result) == ["timestamp", "user_id", "timestamp_session_id"] + + +def test_not_overwriting_columns(df_module): + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 10), # 10 min — within gap + datetime.datetime(2024, 1, 1, 11, 0), # 50 min — exceeds gap + datetime.datetime(2024, 1, 1, 11, 5), # 5 min — within gap + ], + "timestamp_session_id": [1, 2, 3, 4], + } + ) + encoder = SessionEncoder("timestamp") + result = encoder.fit_transform(df) + + col_names = sbd.column_names(result) + assert "timestamp" in col_names + assert "timestamp_session_id" in col_names + # The original "timestamp_session_id" column should not be overwritten + # The new column has name "timestamp_session_id_skrub_RANDOM_SUFFIX" + assert col_names[2].removeprefix("timestamp_session_id").startswith("_skrub_") + + # Check that this also works for a custom suffix + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 10), # 10 min — within gap + datetime.datetime(2024, 1, 1, 11, 0), # 50 min — exceeds gap + datetime.datetime(2024, 1, 1, 11, 5), # 5 min — within gap + ], + "timestamp_custom_name": [1, 2, 3, 4], + } + ) + encoder = SessionEncoder("timestamp", suffix="custom_name") + result = encoder.fit_transform(df) + + col_names = sbd.column_names(result) + assert "timestamp" in col_names + assert "timestamp_custom_name" in col_names + # The original "timestamp_custom_name" column should not be overwritten + # The new column has name "timestamp_custom_name_skrub_RANDOM_SUFFIX" + assert col_names[2].removeprefix("timestamp_custom_name").startswith("_skrub_") + + +def test_empty_column_name(df_module): + """Test that an empty string as column name is a valid split by column name.""" + df = df_module.make_dataframe( + { + "timestamp": [ + datetime.datetime(2024, 1, 1, 10, 0), + datetime.datetime(2024, 1, 1, 10, 10), # 10 min — within gap + datetime.datetime(2024, 1, 1, 11, 0), # 50 min — exceeds gap + datetime.datetime(2024, 1, 1, 11, 5), # 5 min — within gap + ], + "": [1, 1, 1, 2], + } + ) + encoder = SessionEncoder(timestamp_col="timestamp", split_by="") + result = encoder.fit_transform(df) + assert sbd.shape(result)[0] == 4 + assert "timestamp_session_id" in sbd.column_names(result) + assert sbd.to_list(sbd.col(result, "timestamp_session_id")) == [0, 0, 1, 2] + + # Check that not passing the split_by parameter (default None) also works + # and returns the proper result + encoder = SessionEncoder(timestamp_col="timestamp") + result = encoder.fit_transform(df) + assert sbd.shape(result)[0] == 4 + assert "timestamp_session_id" in sbd.column_names(result) + assert sbd.to_list(sbd.col(result, "timestamp_session_id")) == [0, 0, 1, 1]