Source code for censys.asm.inventory

"""Interact with the Censys Inventory Search API."""
import warnings
from typing import List, Optional

from .api import CensysAsmAPI


[docs] class InventorySearch(CensysAsmAPI): """Inventory Search API class.""" base_path = "/inventory/v1"
[docs] def search( self, workspaces: Optional[List[str]] = None, query: Optional[str] = None, page_size: Optional[int] = None, cursor: Optional[str] = None, sort: Optional[List[str]] = None, fields: Optional[List[str]] = None, pages: Optional[int] = None, ) -> dict: """Search inventory data. Args: workspaces (List[str], optional): List of workspace IDs to search. Deprecated. The workspace associated with `CENSYS-API-KEY` will be used automatically. query (str, optional): Query string. page_size (int, optional): Number of results to return. Defaults to 50. cursor (str, optional): Cursor to start search from. sort (List[str], optional): List of fields to sort by. fields (List[str], optional): List of fields to return. pages (int, optional): Number of pages of results to return (when set to -1 returns all pages available). Returns: dict: Inventory search results. """ if workspaces is None: workspaces = [self.get_workspace_id()] else: warnings.warn( "The field 'workspaces' is being deprecated. The workspace associated with `CENSYS-API-KEY` will be used automatically.", category=DeprecationWarning, stacklevel=2, ) if page_size is None: page_size = 50 if pages is None: pages = 1 args = { "workspaces": workspaces, "pageSize": page_size, } if query: args["query"] = query if cursor: args["cursor"] = cursor if sort: args["sort"] = sort if fields: args["fields"] = fields page = 1 next_cursor = None hits = [] resp = self._get(self.base_path, args=args) next_cursor = resp.get("nextCursor") hits.extend(resp.get("hits", [])) # Fetch additional pages if next_cursor is available AND additional pages are requested # Loop will exit if next_cursor is None or if the number of pages requested is reached # Loop will exit if non-200 status code is returned while next_cursor and (pages == -1 or page < pages): args["cursor"] = next_cursor resp = self._get(self.base_path, args=args) if "nextCursor" in resp: next_cursor = resp.get("nextCursor") else: next_cursor = None hits.extend(resp.get("hits", [])) page += 1 resp["hits"] = hits return resp
[docs] def aggregate( self, workspaces: List[str], query: Optional[str] = None, aggregation: Optional[dict] = None, ) -> dict: """Aggregate inventory data. Args: workspaces (List[str]): List of workspace IDs to search. query (str, optional): Query string. aggregation (dict, optional): Aggregation object. Returns: dict: Inventory aggregation results. """ body = { "workspaces": workspaces, "query": query, "aggregation": aggregation, } return self._post(f"{self.base_path}/aggregate", data=body)
[docs] def fields(self, fields: Optional[List[str]] = None) -> dict: """List inventory fields. If no fields are specified, all fields will be returned. Args: fields (List[str], optional): List of fields to return. Returns: dict: Inventory field results. """ args = {"fields": fields} return self._get(f"{self.base_path}/fields", args=args)
Inventory = InventorySearch