← 返回首页
import logging
from functools import wraps
import requests
from feast import RepoConfig
from feast.errors import FeastError
from feast.permissions.client.http_auth_requests_wrapper import (
get_http_auth_requests_session,
)
logger = logging.getLogger(__name__)
def rest_error_handling_decorator(func):
"""
Decorator that provides HTTP session management and error handling for REST API calls.
This decorator:
- Provides a cached HTTP session with connection pooling for improved performance
- Wraps session methods to add logging and error handling
- Maps Feast-specific errors from API responses
The session is reused across requests (connection pooling), which saves
TCP/TLS handshake overhead on subsequent calls.
Connection pool settings can be configured via feature_store.yaml:
```yaml
online_store:
type: remote
path: http://localhost:6566
connection_pool_size: 50 # Max connections in pool
connection_idle_timeout: 300 # Seconds before idle session closes (0 to disable)
connection_retries: 3 # Retry count with backoff
```
"""
@wraps(func)
def wrapper(config: RepoConfig, *args, **kwargs):
assert isinstance(config, RepoConfig)
pool_maxsize = None
max_idle_seconds = None
max_retries = None
if config.online_store is not None:
attr_map = {
"pool_maxsize": "connection_pool_size",
"max_idle_seconds": "connection_idle_timeout",
"max_retries": "connection_retries",
}
conn_config = {
key: getattr(config.online_store, attr_name, None)
for key, attr_name in attr_map.items()
}
pool_maxsize = conn_config["pool_maxsize"]
max_idle_seconds = conn_config["max_idle_seconds"]
max_retries = conn_config["max_retries"]
session = get_http_auth_requests_session(
config.auth_config,
pool_maxsize=pool_maxsize,
max_idle_seconds=max_idle_seconds,
max_retries=max_retries,
)
_wrap_session_methods(session)
return func(session, config, *args, **kwargs)
return wrapper
_ATTR_WRAPPED = "_feast_methods_wrapped"
def _wrap_session_methods(session: requests.Session) -> None:
"""
Wrap session HTTP methods with logging and Feast error mapping.
Wraps each method exactly once. Subsequent calls are no-ops, preventing
the unbounded nesting that leads to ``RecursionError`` when a cached
session is reused across many requests.
"""
if getattr(session, _ATTR_WRAPPED, False):
return
for method_name in ("get", "post", "put", "delete"):
original_method = getattr(session, method_name)
@wraps(original_method)
def wrapped_method(*args, _orig=original_method, _name=method_name, **kwargs):
logger.debug(f"Calling {_name} with args: {args}, kwargs: {kwargs}")
response = _orig(*args, **kwargs)
logger.debug(f"{_name} response status code: {response.status_code}")
try:
response.raise_for_status()
except requests.RequestException:
logger.debug(f"response.json() = {response.json()}")
mapped_error = FeastError.from_error_detail(response.json())
logger.debug(f"mapped_error = {str(mapped_error)}")
if mapped_error is not None:
raise mapped_error
return response
setattr(session, method_name, wrapped_method)
object.__setattr__(session, _ATTR_WRAPPED, True)