"""Base for interacting with the Censys Search API."""
import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Iterable, Iterator, List, Optional, Type, Union

from requests.models import Response

from censys.common.base import CensysAPIBase
from censys.common.config import DEFAULT, get_config
from censys.common.exceptions import (

Fields = Optional[List[str]]

INDEX_TO_KEY = {"hosts": "ip"}

[docs]class CensysSearchAPIv2(CensysAPIBase): """This class is the base class for the Hosts index. See CensysAPIBase for additional arguments. Args: *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Raises: CensysException: Base Exception Class for the Censys API. Examples: >>> c = CensysSearchAPIv2() """ DEFAULT_URL: str = "" """Default Search API base URL.""" INDEX_NAME: str = "" """Name of Censys Index.""" def __init__( self, api_id: Optional[str] = None, api_secret: Optional[str] = None, **kwargs ): """Inits CensysSearchAPIv2.""" CensysAPIBase.__init__(self, kwargs.get("url", self.DEFAULT_URL), **kwargs) # Gets config file config = get_config() # Try to get credentials self._api_id = ( api_id or os.getenv("CENSYS_API_ID") or config.get(DEFAULT, "api_id") ) self._api_secret = ( api_secret or os.getenv("CENSYS_API_SECRET") or config.get(DEFAULT, "api_secret") ) if not self._api_id or not self._api_secret: raise CensysException("No API ID or API secret configured.") self._session.auth = (self._api_id, self._api_secret) # Generate concrete paths to be called self.view_path = f"/{self.INDEX_NAME}/" self.search_path = f"/{self.INDEX_NAME}/search" self.aggregate_path = f"/{self.INDEX_NAME}/aggregate" def _get_exception_class( # type: ignore self, res: Response ) -> Type[CensysSearchException]: return CensysExceptionMapper.SEARCH_EXCEPTIONS.get( res.status_code, CensysSearchException ) # def account(self) -> dict: # """ # Gets the current account information. Including email and quota. # Returns: # dict: Account response. # """ # return self._get("account") # def quota(self) -> dict: # """ # Gets the current account's query quota. # Returns: # dict: Quota response. # """ # return self.account()["quota"]
[docs] class Query(Iterable): """Query class that is callable and iterable. Object Searches the given index for all records that match the given query. For more details, see our documentation: Args: api (CensysSearchAPIv2): Parent API object. query (str): The query to be executed. per_page (int): Optional; The number of results to be returned for each page. Defaults to 100. cursor (int): Optional; The cursor of the desired result set. pages (int): Optional; The number of pages returned. Defaults to 1. """ def __init__( self, api: "CensysSearchAPIv2", query: str, per_page: Optional[int] = None, cursor: Optional[str] = None, pages: int = 1, ): """Inits Query.""" self.api = api self.query = query self.per_page = per_page self.cursor = cursor self.nextCursor: Optional[str] = None = 1 self.pages = pages def __call__(self, per_page: Optional[int] = None) -> List[dict]: """Search current index. Args: per_page (int): Optional; The number of results to be returned for each page. Defaults to 100. Raises: StopIteration: Raised when pages have been already received. Returns: List[dict]: One page worth of result hits. """ if > self.pages: raise StopIteration args = { "q": self.query, "per_page": per_page or self.per_page or 100, "cursor": self.nextCursor or self.cursor, } payload = self.api._get(self.api.search_path, args) += 1 result = payload["result"] self.nextCursor = result["links"]["next"] if result["total"] == 0 or not self.nextCursor: self.pages = 0 return result["hits"] def __next__(self) -> List[dict]: """Gets next page of search results. Returns: List[dict]: One page worth of result hits. """ return self.__call__() def __iter__(self) -> Iterator[List[dict]]: """Gets Iterator. Returns: Iterable: Returns self. """ return self
[docs] def view_all(self) -> Dict[str, dict]: """View each document returned from query. Please note that each result returned by the query will be looked up using the view method. Returns: Dict[str, dict]: Dictionary mapping documents to that document's result set. """ threads = [] results = {} document_key = INDEX_TO_KEY.get(self.api.INDEX_NAME, "ip") with ThreadPoolExecutor(max_workers=20) as executor: for hit in self.__call__(): document_id = hit[document_key] threads.append(executor.submit(self.api.view, document_id)) for task in as_completed(threads): result = task.result() results[result[document_key]] = result return results
[docs] def search( self, query: str, per_page: Optional[int] = None, cursor: Optional[str] = None, pages: int = 1, ) -> Query: """Search current index. Searches the given index for all records that match the given query. For more details, see our documentation: Args: query (str): The query to be executed. per_page (int): Optional; The number of results to be returned for each page. Defaults to 100. cursor (int): Optional; The cursor of the desired result set. pages (int): Optional; The number of pages returned. Defaults to 1. Returns: Query: Query object that can be a callable or an iterable. """ return self.Query(self, query, per_page, cursor, pages)
[docs] def view( self, document_id: str, at_time: Optional[Union[str,, datetime.datetime]] = None, ) -> dict: """View document from current index. View the current structured data we have on a specific document. For more details, see our documentation: Args: document_id (str): The ID of the document you are requesting. at_time ([str,, datetime.datetime]): Optional; Fetches a document at a given point in time. Returns: dict: The result set returned. """ args = {} if at_time: if isinstance(at_time, (, datetime.datetime)): at_time = at_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") args["at_time"] = at_time return self._get(self.view_path + document_id, args)["result"]
[docs] def aggregate( self, query: str, field: str, num_buckets: Optional[int] = None ) -> dict: """Aggregate current index. Creates a report on the breakdown of the values of a field in a result set. For more details, see our documentation: Args: query (str): The query to be executed. field (str): The field you are running a breakdown on. num_buckets (int): Optional; The maximum number of values. Defaults to 50. Returns: dict: The result set returned. """ args = {"q": query, "field": field, "num_buckets": num_buckets} return self._get(self.aggregate_path, args)["result"]