|
|
||
| from feast.repo_config import FeastConfigBaseModel | ||
|
|
||
| class OfflinePushBatchingConfig(FeastConfigBaseModel): |
There was a problem hiding this comment.
i think having a config is fine but do we actually need it? we probably could have just passed these as optional args, no?
Sorry, something went wrong.
There was a problem hiding this comment.
You're right, I just noticed the FeatureLoggingConfig with 5 fields and decided that for 3 fields it would be justified to also add a config. Do you want me to refactor it to use optional args?
Sorry, something went wrong.
There was a problem hiding this comment.
i think having a config is fine but do we actually need it? we probably could have just passed these as optional args, no?
I refactored it as you wanted, so that there is no config. @franciscojavierarceo
Sorry, something went wrong.
| return | ||
|
|
||
| batch_df = pd.concat(dfs, ignore_index=True) | ||
| self._buffers[key].clear() |
There was a problem hiding this comment.
Does it make sense to move clear inside try: self._store.push so that buffer gets cleared only after the write succeeds ?
Sorry, something went wrong.
There was a problem hiding this comment.
Totally makes sense, it's done. Thanks for seeing that.
Sorry, something went wrong.
|
|
||
| # NOTE: offline writes are currently synchronous only, so we call directly | ||
| try: | ||
| self._store.push( |
There was a problem hiding this comment.
What about splitting _flush_locked into two methods: one that extracts data (with lock) and one that does I/O (without lock) ?
Something like:
Sorry, something went wrong.
There was a problem hiding this comment.
Thanks for the suggestion!
I split _flush_locked into _drain_locked (extract under lock) and _flush (I/O without lock). I also added _inflight to prevent concurrent flushes per key. On failure the drained batch is re‑enqueued so we don’t drop data.
Sorry, something went wrong.
|
|
||
| # use a multi-row payload to ensure we test non-trivial dfs | ||
| resp = client.post("/push", json=push_body_many(push_mode, count=2, id_start=100)) | ||
| assert resp.status_code == 200 |
There was a problem hiding this comment.
optional but I think it's good to return 202 when batching is enabled and offline writes are involved
Sorry, something went wrong.
There was a problem hiding this comment.
Nice catch! It's done.
Sorry, something went wrong.
There was a problem hiding this comment.
This PR adds configurable batching support for offline writes to the feature server's /push endpoint. The batching mechanism buffers offline writes and flushes them based on either a size threshold or time interval, improving throughput for high-volume offline push operations.
Key Changes:
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file| sdk/python/feast/feature_server.py | Implemented OfflineWriteBatcher class and integrated batching logic into the /push endpoint |
| sdk/python/feast/infra/feature_servers/base_config.py | Added three new configuration fields for offline push batching |
| sdk/python/tests/unit/test_feature_server.py | Added comprehensive test coverage for batching behavior across different push modes and configurations |
| docs/reference/feature-store-yaml.md | Documented the new feature_server configuration block with batching options |
| docs/reference/feature-servers/python-feature-server.md | Added user-facing documentation explaining offline write batching functionality |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Sorry, something went wrong.
| allow_registry_cache = request.allow_registry_cache | ||
| transform_on_write = request.transform_on_write | ||
|
|
||
| # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store |
There was a problem hiding this comment.
Corrected spelling: 'theres' should be 'there's'.
| # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store | |
| # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as there's no async for offline store |
Sorry, something went wrong.
| fs, enabled: bool = True, batch_size: int = 1, batch_interval_seconds: int = 60 | ||
| ): | ||
| """ | ||
| Attach a minimal feature_server.offline_push_batching config |
There was a problem hiding this comment.
The docstring has inconsistent indentation. The closing triple quotes and the description line should be aligned with the opening triple quotes for standard formatting.
| Attach a minimal feature_server.offline_push_batching config | |
| Attach a minimal feature_server.offline_push_batching config |
Sorry, something went wrong.
There was a problem hiding this comment.
lgtm
Sorry, something went wrong.
What this PR does / why we need it:
Added batching configuration for feature_servers /push endpoint for offline store writes
Which issue(s) this PR fixes:
Fixes #5683