"""Base for interacting with the Censys Assets API."""
import re
from typing import Any, Dict, Iterator, List, Optional
from ..api import CensysAsmAPI
from censys.common.exceptions import CensysInvalidColorException
HEX_REGEX = re.compile(r"^#(?:[0-9a-fA-F]{3}){1,2}$")
[docs]
class Assets(CensysAsmAPI):
"""Assets API class."""
asset_type: str
def __init__(self, asset_type: str, *args, **kwargs):
"""Inits Assets.
Args:
asset_type (str): Type of asset to interact with.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
CensysAsmAPI.__init__(self, *args, **kwargs)
api_version = kwargs.get("api_version", "v1")
self.base_path = f"/{api_version}/assets/{asset_type}"
self.asset_type = asset_type
def _format_asset_id(self, asset_id: str) -> str:
"""Formats asset ID.
Args:
asset_id (str): Asset ID to format.
Returns:
str: Formatted asset ID.
"""
return asset_id
[docs]
def get_assets(
self,
page_number: int = 1,
page_size: Optional[int] = None,
tag: Optional[List[str]] = None,
tag_operator: Optional[str] = None,
source: Optional[List[str]] = None,
discovery_trail: Optional[bool] = None,
) -> Iterator[dict]:
"""Requests assets data.
Args:
page_number (int): Optional; Page number to begin at when searching.
page_size (int): Optional; Page size for retrieving assets.
tag (list): Optional; List of tags to search for.
tag_operator (str): Optional; Operator to use when searching for tags.
source (list): Optional; List of sources to search for.
discovery_trail (bool): Optional; Bool indicating whether to return discovery trail.
Yields:
dict: The assets result returned.
"""
args: Dict[str, Any] = {}
if tag:
args["tag"] = tag
if tag_operator:
args["tagOperator"] = tag_operator
if source:
args["source"] = source
if discovery_trail:
args["discoveryTrail"] = discovery_trail
yield from self._get_page(
self.base_path, page_number=page_number, page_size=page_size, args=args
)
[docs]
def get_asset_by_id(self, asset_id: str) -> dict:
"""Requests asset data by ID.
Args:
asset_id (str): Requested asset ID.
Returns:
dict: Asset search result.
"""
path = f"{self.base_path}/{self._format_asset_id(asset_id)}"
return self._get(path)
[docs]
def add_tag(self, asset_id: str, name: str, color: Optional[str] = None) -> dict:
"""Adds a tag to a specified asset on the ASM platform.
Args:
asset_id (str): Asset ID to add tag to.
name (str): New tag name.
color (str): Optional; New tag color (hex).
Returns:
dict: Added tag results.
"""
path = f"{self.base_path}/{self._format_asset_id(asset_id)}/tags"
data = format_tag(name, color)
return self._post(path, data=data)
[docs]
def delete_tag(self, asset_id: str, name: str) -> dict:
"""Deletes a tag from a specified asset on the ASM platform by tag name.
Args:
asset_id (str): Asset ID to delete tag from.
name (str): Tag name to delete.
Returns:
dict: Deleted tag results.
"""
path = f"{self.base_path}/{self._format_asset_id(asset_id)}/tags/{name}"
return self._delete(path)
def format_tag(name: str, color: Optional[str] = None) -> dict:
"""Formats tag name and color request data.
Args:
name (str): Tag name.
color (str): Optional; Tag color (hex).
Raises:
CensysInvalidColorException: Raised if color is not a valid hex color.
Returns:
dict: Formatted tag request data.
"""
if color:
if not HEX_REGEX.match(color):
raise CensysInvalidColorException(10037, f"{color} is not a valid color.")
return {"name": str(name), "color": str(color)}
return {"name": str(name)}