We read every piece of feedback, and take your input very seriously.
Include my email address so I can be contactedHave a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
| Expand Up | @@ -10,6 +10,7 @@ | |
| import psutil | ||
| from dateutil import parser | ||
| from fastapi import Depends, FastAPI, Request, Response, status | ||
| from fastapi.concurrency import run_in_threadpool | ||
| from fastapi.logger import logger | ||
| from fastapi.responses import JSONResponse | ||
| from google.protobuf.json_format import MessageToDict | ||
| Expand Down Expand Up | @@ -112,7 +113,7 @@ async def get_body(request: Request): | |
| "/get-online-features", | ||
| dependencies=[Depends(inject_user_details)], | ||
| ) | ||
| def get_online_features(body=Depends(get_body)): | ||
| async def get_online_features(body=Depends(get_body)): | ||
| body = json.loads(body) | ||
| full_feature_names = body.get("full_feature_names", False) | ||
| entity_rows = body["entities"] | ||
| Expand Down Expand Up | @@ -145,15 +146,22 @@ def get_online_features(body=Depends(get_body)): | |
| resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE] | ||
| ) | ||
|
|
||
| response_proto = store.get_online_features( | ||
| read_params = dict( | ||
| features=features, | ||
| entity_rows=entity_rows, | ||
| full_feature_names=full_feature_names, | ||
| ).proto | ||
| ) | ||
|
|
||
| if store._get_provider().async_supported.online.read: | ||
| response = await store.get_online_features_async(**read_params) | ||
| else: | ||
| response = await run_in_threadpool( | ||
| lambda: store.get_online_features(**read_params) | ||
| ) | ||
|
Comment thread
Comment on lines
+155
to
+160
Copy link
Copy Markdown
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide commentuse async feature lookups if it's supported by the online store, otherwise run the sync method in the threadpool
Sorry, something went wrong.
All reactions
|
||
|
|
||
| # Convert the Protobuf object to JSON and return it | ||
| return MessageToDict( | ||
| response_proto, preserving_proto_field_name=True, float_precision=18 | ||
| response.proto, preserving_proto_field_name=True, float_precision=18 | ||
| ) | ||
|
|
||
| @app.post("/push", dependencies=[Depends(inject_user_details)]) | ||
| Expand Down | ||
| Expand Up | @@ -23,6 +23,7 @@ | |
| from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject | ||
| from feast.infra.online_stores.helpers import compute_entity_id | ||
| from feast.infra.online_stores.online_store import OnlineStore | ||
| from feast.infra.supported_async_methods import SupportedAsyncMethods | ||
| from feast.protos.feast.core.DynamoDBTable_pb2 import ( | ||
| DynamoDBTable as DynamoDBTableProto, | ||
| ) | ||
| Expand Down Expand Up | @@ -88,6 +89,10 @@ class DynamoDBOnlineStore(OnlineStore): | |
| _dynamodb_resource = None | ||
| _aioboto_session = None | ||
|
|
||
| @property | ||
| def async_supported(self) -> SupportedAsyncMethods: | ||
| return SupportedAsyncMethods(read=True) | ||
|
Comment thread
Comment on lines
+92
to
+94
Copy link
Copy Markdown
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide commentan online store can opt in to using async read/write calls as they become available.
Sorry, something went wrong.
All reactions
|
||
|
|
||
| def update( | ||
| self, | ||
| config: RepoConfig, | ||
| Expand Down | ||
| Expand Up | @@ -22,6 +22,7 @@ | |
| from feast.feature_view import FeatureView | ||
| from feast.infra.infra_object import InfraObject | ||
| from feast.infra.registry.base_registry import BaseRegistry | ||
| from feast.infra.supported_async_methods import SupportedAsyncMethods | ||
| from feast.online_response import OnlineResponse | ||
| from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto | ||
| from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto | ||
| Expand All | @@ -36,6 +37,10 @@ class OnlineStore(ABC): | |
| The interface that Feast uses to interact with the storage system that handles online features. | ||
| """ | ||
|
|
||
| @property | ||
| def async_supported(self) -> SupportedAsyncMethods: | ||
| return SupportedAsyncMethods() | ||
|
Comment thread
Comment on lines
+40
to
+42
Copy link
Copy Markdown
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide commentby default an online store does not have async methods.
Sorry, something went wrong.
All reactions
|
||
|
|
||
| @abstractmethod | ||
| def online_write_batch( | ||
| self, | ||
| Expand Down | ||
| @@ -0,0 +1,10 @@ | ||
| from pydantic import BaseModel, Field | ||
|
|
||
|
|
||
| class SupportedAsyncMethods(BaseModel): | ||
| read: bool = Field(default=False) | ||
| write: bool = Field(default=False) | ||
|
|
||
|
|
||
| class ProviderAsyncMethods(BaseModel): | ||
| online: SupportedAsyncMethods = Field(default_factory=SupportedAsyncMethods) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Choose a reason Spam Abuse Off Topic Outdated Duplicate Resolved Low Quality Hide comment/get-online-features endpoint is now async
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.