Sorry, something went wrong.
There was a problem hiding this comment.
This PR adds non-entity mode historical feature retrieval to the Dask offline store, enabling users to retrieve features over a time range (start_date/end_date) without providing an entity_df.
Key changes:
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| sdk/python/feast/infra/offline_stores/dask.py | Implements non-entity mode by making entity_df optional, synthesizing a minimal entity_df when None, and using cross-join logic when join keys are absent |
| sdk/python/tests/unit/infra/offline_stores/test_dask_non_entity.py | Adds unit test verifying that the API accepts start_date/end_date parameters in non-entity mode and returns a valid RetrievalJob |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Sorry, something went wrong.
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | ||
| if start_date is None: | ||
| max_ttl_seconds = 0 | ||
| for fv in feature_views: | ||
| if fv.ttl and isinstance(fv.ttl, timedelta): | ||
| max_ttl_seconds = max( | ||
| max_ttl_seconds, int(fv.ttl.total_seconds()) | ||
| ) | ||
| if max_ttl_seconds > 0: | ||
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | ||
| else: | ||
| # Keep default window bounded to avoid unbounded scans by default. | ||
| start_date = end_date - timedelta(days=30) | ||
|
|
There was a problem hiding this comment.
The start_date parameter is not enforced in the actual data filtering - only the end_date is used (along with TTL). The filtering logic in _filter_ttl will use end_date - ttl as the lower bound, not the user-provided start_date. This means if a user provides start_date that is later than end_date - ttl, they may get more data than expected.
Consider adding a validation check or warning when start_date is provided but will be overridden by TTL logic, or add a TODO comment indicating that proper start_date filtering should be implemented in a follow-up.
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | |
| if start_date is None: | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| start_date = end_date - timedelta(days=30) | |
| # Compute TTL-based lower bound for start_date. | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| ttl_lower_bound = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| ttl_lower_bound = end_date - timedelta(days=30) | |
| # If user provided start_date, use the max of user start_date and ttl_lower_bound. | |
| if start_date is not None: | |
| if start_date < ttl_lower_bound: | |
| import warnings | |
| warnings.warn( | |
| f"Provided start_date ({start_date}) is earlier than TTL-based lower bound ({ttl_lower_bound}). Overriding start_date to {ttl_lower_bound}." | |
| ) | |
| start_date = max(start_date, ttl_lower_bound) | |
| else: | |
| start_date = ttl_lower_bound |
Sorry, something went wrong.
|
|
||
| fv = _mock_feature_view() | ||
|
|
||
| # Expect this to work once non-entity mode is implemented for Dask-based store |
There was a problem hiding this comment.
The comment is outdated. Since the non-entity mode is now implemented (not just expected to work in the future), this comment should be updated to describe what the test actually validates.
Consider updating to something like:
| # Expect this to work once non-entity mode is implemented for Dask-based store | |
| # Verify that non-entity mode (entity_df=None) accepts start_date and end_date parameters |
Sorry, something went wrong.
| end_date=datetime(2023, 1, 2, tzinfo=timezone.utc), | ||
| ) | ||
|
|
||
| # When implemented, should return a RetrievalJob |
There was a problem hiding this comment.
The comment is outdated. Since the implementation is complete, this comment should be updated to describe what is being asserted.
Consider updating to something like:
| # When implemented, should return a RetrievalJob | |
| # Should return a RetrievalJob instance |
Sorry, something went wrong.
| # df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) | ||
|
|
There was a problem hiding this comment.
Commented-out code should be removed. If this line is no longer needed due to the refactoring below, it should be deleted rather than commented out.
| # df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) |
Sorry, something went wrong.
| entity_df, dd.DataFrame | ||
| ): | ||
| raise ValueError( | ||
| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" |
There was a problem hiding this comment.
The error message formatting is incorrect. The f-string should format pd.DataFrame as a string, not call type() on it. This will result in an error message like "...type <class 'type'> instead of type <class 'type'>".
Consider changing to:
| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" | |
| f"Please provide an entity_df of type pd.DataFrame or dask.dataframe.DataFrame instead of type {type(entity_df)}" |
Sorry, something went wrong.
There was a problem hiding this comment.
@aniketpalu Please fix this as well.
Sorry, something went wrong.
| if non_entity_mode and any( | ||
| k not in entity_df_with_features.columns for k in join_keys | ||
| ): |
There was a problem hiding this comment.
[nitpick] The logic for checking missing join keys could be simplified. In non-entity mode, the synthetic entity_df will never contain join keys (it only has the event_timestamp column). Therefore, the any() check is unnecessary overhead.
Consider simplifying to:
This makes the logic clearer and more efficient since we know join keys are never present in the synthetic entity_df.
| if non_entity_mode and any( | |
| k not in entity_df_with_features.columns for k in join_keys | |
| ): | |
| if non_entity_mode: |
Sorry, something went wrong.
There was a problem hiding this comment.
Agree with copilot here
Sorry, something went wrong.
There was a problem hiding this comment.
ACK pending comments
Sorry, something went wrong.
|
|
||
| if non_entity_mode: | ||
| # Default end_date to current time (UTC) to keep behavior predictable without extra parameters. | ||
| end_date = end_date or datetime.now(timezone.utc) |
There was a problem hiding this comment.
| end_date = end_date or datetime.now(timezone.utc) | |
| end_date = make_tzaware(end_date) or datetime.now(timezone.utc) |
Sorry, something went wrong.
| end_date = end_date or datetime.now(timezone.utc) | ||
|
|
||
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | ||
| if start_date is None: |
There was a problem hiding this comment.
If start_date is given you have to make it tzaware ?
Sorry, something went wrong.
| # Minimal synthetic entity_df: one timestamp row; join keys are not materialized here on purpose to avoid | ||
| # accidental dependence on specific feature view schemas at this layer. | ||
| entity_df = pd.DataFrame( | ||
| {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]} |
There was a problem hiding this comment.
you havent given start date and tz. Both matters very much. It should be:
Sorry, something went wrong.
| entity_df, dd.DataFrame | ||
| ): | ||
| raise ValueError( | ||
| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" |
There was a problem hiding this comment.
@aniketpalu Please fix this as well.
Sorry, something went wrong.
| if non_entity_mode and any( | ||
| k not in entity_df_with_features.columns for k in join_keys | ||
| ): |
There was a problem hiding this comment.
Agree with copilot here
Sorry, something went wrong.
| # When implemented, should return a RetrievalJob | ||
| from feast.infra.offline_stores.offline_store import RetrievalJob | ||
|
|
||
| assert isinstance(retrieval_job, RetrievalJob) |
There was a problem hiding this comment.
I dont think this is good enough to validate the data based retrieval
Sorry, something went wrong.
There was a problem hiding this comment.
LGTM
Sorry, something went wrong.
What this PR does / why we need it:
Adds start/end-only historical retrieval to Dask offline store, enabling users to fetch features over a time range without providing an entity_df.
Makes entity_df optional in DaskOfflineStore.get_historical_features and accepts start_date/end_date via kwargs.
In non-entity mode:
Which issue(s) this PR fixes:
RHOAIENG-37451
Misc