You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.
Dismiss alert
from feast.permissions.action import AuthzedAction
if TYPE_CHECKING:
from feast.feast_object import FeastObject
logger = logging.getLogger(__name__)
def is_a_feast_object(resource: Any):
"""
A matcher to verify that a given object is one of the Feast objects defined in the `FeastObject` type.
Args:
resource: An object instance to verify.
Returns:
`True` if the given object is one of the types in the FeastObject alias or a subclass of one of them.
"""
from feast.feast_object import ALL_RESOURCE_TYPES
for t in ALL_RESOURCE_TYPES:
# Use isinstance to pass Mock validation
if isinstance(resource, t):
return True
return False
def _get_type(resource: "FeastObject") -> Any:
is_mock = isinstance(resource, Mock)
if not is_mock:
return type(resource)
else:
return getattr(resource, "_spec_class", None)
def resource_match_config(
resource: "FeastObject",
expected_types: list["FeastObject"],
name_patterns: list[str],
required_tags: Optional[dict[str, str]] = None,
) -> bool:
"""
Match a given Feast object against the configured type, name and tags in a permission configuration.
Args:
resource: A FeastObject instance to match agains the permission.
expected_types: The list of object types configured in the permission. Type match also includes all the sub-classes.
name_patterns: The possibly empty list of name pattern filters configured in the permission.
required_tags: The optional dictionary of required tags configured in the permission.
Returns:
bool: `True` if the resource matches the configured permission filters.
"""
if resource is None:
logger.warning(f"None passed to {resource_match_config.__name__}")
return False
_type = _get_type(resource)
if not is_a_feast_object(resource):
logger.warning(f"Given resource is not of a managed type but {_type}")
return False
# mypy check ignored because of https://github.com/python/mypy/issues/11673, or it raises "Argument 2 to "isinstance" has incompatible type "tuple[Featu ..."
if not isinstance(resource, tuple(expected_types)): # type: ignore
logger.info(
f"Resource does not match any of the expected type {expected_types}"
)
return False
if not _resource_name_matches_name_patterns(resource, name_patterns):
return False
if required_tags:
if hasattr(resource, "required_tags"):
if isinstance(resource.required_tags, dict):
for tag in required_tags.keys():
required_value = required_tags.get(tag)
actual_value = resource.required_tags.get(tag)
if required_value != actual_value:
logger.info(
f"Unmatched value {actual_value} for required tag {tag}: expected {required_value}"
)
return False
else:
logger.warning(
f"Resource {resource} has no `required_tags` attribute of unexpected type {type(resource.required_tags)}"
)
else:
logger.warning(f"Resource {resource} has no `required_tags` attribute")
return True
def _resource_name_matches_name_patterns(
resource: "FeastObject",
name_patterns: list[str],
) -> bool:
if not hasattr(resource, "name"):
logger.warning(f"Resource {resource} has no `name` attribute")
return True
if not name_patterns:
return True
if resource.name is None:
return True
if not isinstance(resource.name, str):
logger.warning(
f"Resource {resource} has `name` attribute of unexpected type {type(resource.name)}"
)
return True
for name_pattern in name_patterns:
match = bool(re.fullmatch(name_pattern, resource.name))
if not match:
logger.info(
f"Resource name {resource.name} does not match pattern {name_pattern}"
)
else:
logger.info(f"Resource name {resource.name} matched pattern {name_pattern}")
return True
return False
def actions_match_config(
requested_actions: list[AuthzedAction],
allowed_actions: list[AuthzedAction],
) -> bool:
"""
Match a list of actions against the actions defined in a permission configuration.
Args:
requested_actions: A list of actions to be executed.
allowed_actions: The list of actions configured in the permission.
Returns:
bool: `True` if all the given `requested_actions` are defined in the `allowed_actions`.
"""
return all(a in allowed_actions for a in requested_actions)