We read every piece of feedback, and take your input very seriously.
Include my email address so I can be contactedHave a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
| Expand Up | @@ -11,6 +11,7 @@ | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| import asyncio | ||
| import itertools | ||
| import os | ||
| import warnings | ||
| Expand All | @@ -33,6 +34,7 @@ | |
| import pandas as pd | ||
| import pyarrow as pa | ||
| from colorama import Fore, Style | ||
| from fastapi.concurrency import run_in_threadpool | ||
| from google.protobuf.timestamp_pb2 import Timestamp | ||
| from tqdm import tqdm | ||
|
|
||
| Expand Down Expand Up | @@ -1423,26 +1425,13 @@ def tqdm_builder(length): | |
| end_date, | ||
| ) | ||
|
|
||
| def push( | ||
| self, | ||
| push_source_name: str, | ||
| df: pd.DataFrame, | ||
| allow_registry_cache: bool = True, | ||
| to: PushMode = PushMode.ONLINE, | ||
| ): | ||
| """ | ||
| Push features to a push source. This updates all the feature views that have the push source as stream source. | ||
|
|
||
| Args: | ||
| push_source_name: The name of the push source we want to push data to. | ||
| df: The data being pushed. | ||
| allow_registry_cache: Whether to allow cached versions of the registry. | ||
| to: Whether to push to online or offline store. Defaults to online store only. | ||
| """ | ||
| def _fvs_for_push_source_or_raise( | ||
| self, push_source_name: str, allow_cache: bool | ||
| ) -> set[FeatureView]: | ||
| from feast.data_source import PushSource | ||
|
|
||
| all_fvs = self.list_feature_views(allow_cache=allow_registry_cache) | ||
| all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache) | ||
| all_fvs = self.list_feature_views(allow_cache=allow_cache) | ||
| all_fvs += self.list_stream_feature_views(allow_cache=allow_cache) | ||
|
|
||
| fvs_with_push_sources = { | ||
| fv | ||
| Expand All | @@ -1457,7 +1446,27 @@ def push( | |
| if not fvs_with_push_sources: | ||
| raise PushSourceNotFoundException(push_source_name) | ||
|
|
||
| for fv in fvs_with_push_sources: | ||
| return fvs_with_push_sources | ||
|
|
||
| def push( | ||
| self, | ||
| push_source_name: str, | ||
| df: pd.DataFrame, | ||
| allow_registry_cache: bool = True, | ||
| to: PushMode = PushMode.ONLINE, | ||
| ): | ||
| """ | ||
| Push features to a push source. This updates all the feature views that have the push source as stream source. | ||
|
|
||
| Args: | ||
| push_source_name: The name of the push source we want to push data to. | ||
| df: The data being pushed. | ||
| allow_registry_cache: Whether to allow cached versions of the registry. | ||
| to: Whether to push to online or offline store. Defaults to online store only. | ||
| """ | ||
| for fv in self._fvs_for_push_source_or_raise( | ||
| push_source_name, allow_registry_cache | ||
| ): | ||
| if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE: | ||
| self.write_to_online_store( | ||
| fv.name, df, allow_registry_cache=allow_registry_cache | ||
| Expand All | @@ -1467,22 +1476,42 @@ def push( | |
| fv.name, df, allow_registry_cache=allow_registry_cache | ||
| ) | ||
|
|
||
| def write_to_online_store( | ||
| async def push_async( | ||
| self, | ||
| push_source_name: str, | ||
| df: pd.DataFrame, | ||
| allow_registry_cache: bool = True, | ||
| to: PushMode = PushMode.ONLINE, | ||
| ): | ||
| fvs = self._fvs_for_push_source_or_raise(push_source_name, allow_registry_cache) | ||
|
|
||
| if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE: | ||
| _ = await asyncio.gather( | ||
| *[ | ||
| self.write_to_online_store_async( | ||
| fv.name, df, allow_registry_cache=allow_registry_cache | ||
| ) | ||
| for fv in fvs | ||
| ] | ||
| ) | ||
|
Comment thread
Comment on lines
+1489
to
+1496
Copy link
Copy Markdown
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide commentfor async push we parallelize all the writes to the online store ...
Sorry, something went wrong.
All reactions
|
||
|
|
||
| if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE: | ||
|
|
||
| def _offline_write(): | ||
| for fv in fvs: | ||
| self.write_to_offline_store( | ||
| fv.name, df, allow_registry_cache=allow_registry_cache | ||
| ) | ||
|
|
||
| await run_in_threadpool(_offline_write) | ||
|
Comment thread
Comment on lines
+1498
to
+1506
Copy link
Copy Markdown
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide comment... but keep the existing sync impl of offline writes except we'll put them in the threadpool so it doesn't block the server
Sorry, something went wrong.
All reactions
|
||
|
|
||
| def _get_feature_view_and_df_for_online_write( | ||
| self, | ||
| feature_view_name: str, | ||
| df: Optional[pd.DataFrame] = None, | ||
| inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, | ||
| allow_registry_cache: bool = True, | ||
| ): | ||
| """ | ||
| Persists a dataframe to the online store. | ||
|
|
||
| Args: | ||
| feature_view_name: The feature view to which the dataframe corresponds. | ||
| df: The dataframe to be persisted. | ||
| inputs: Optional the dictionary object to be written | ||
| allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. | ||
| """ | ||
| feature_view_dict = { | ||
| fv_proto.name: fv_proto | ||
| for fv_proto in self.list_all_feature_views(allow_registry_cache) | ||
| Expand All | @@ -1509,10 +1538,60 @@ def write_to_online_store( | |
| df = pd.DataFrame(df) | ||
| except Exception as _: | ||
| raise DataFrameSerializationError(df) | ||
| return feature_view, df | ||
|
|
||
| def write_to_online_store( | ||
| self, | ||
| feature_view_name: str, | ||
| df: Optional[pd.DataFrame] = None, | ||
| inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, | ||
| allow_registry_cache: bool = True, | ||
| ): | ||
| """ | ||
| Persists a dataframe to the online store. | ||
|
|
||
| Args: | ||
| feature_view_name: The feature view to which the dataframe corresponds. | ||
| df: The dataframe to be persisted. | ||
| inputs: Optional the dictionary object to be written | ||
| allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. | ||
| """ | ||
|
|
||
| feature_view, df = self._get_feature_view_and_df_for_online_write( | ||
| feature_view_name=feature_view_name, | ||
| df=df, | ||
| inputs=inputs, | ||
| allow_registry_cache=allow_registry_cache, | ||
| ) | ||
| provider = self._get_provider() | ||
| provider.ingest_df(feature_view, df) | ||
|
|
||
| async def write_to_online_store_async( | ||
| self, | ||
| feature_view_name: str, | ||
| df: Optional[pd.DataFrame] = None, | ||
| inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, | ||
| allow_registry_cache: bool = True, | ||
| ): | ||
| """ | ||
| Persists a dataframe to the online store asynchronously. | ||
|
|
||
| Args: | ||
| feature_view_name: The feature view to which the dataframe corresponds. | ||
| df: The dataframe to be persisted. | ||
| inputs: Optional the dictionary object to be written | ||
| allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. | ||
| """ | ||
|
|
||
| feature_view, df = self._get_feature_view_and_df_for_online_write( | ||
| feature_view_name=feature_view_name, | ||
| df=df, | ||
| inputs=inputs, | ||
| allow_registry_cache=allow_registry_cache, | ||
| ) | ||
| provider = self._get_provider() | ||
| await provider.ingest_df_async(feature_view, df) | ||
|
|
||
| def write_to_offline_store( | ||
| self, | ||
| feature_view_name: str, | ||
| Expand Down | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide commentif async writes to the online store are supported and the post body indicates that data will be written to the online store, then attempt an async storep.push
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.