Source code for censys.base

"""
Base for interacting with the Censys API's.
"""

import os
import json
import warnings
from functools import wraps
from typing import Any, Callable, List, Optional, Type

import backoff
import requests
from requests.models import Response

from censys import __name__ as NAME
from censys import __version__ as VERSION
from censys.exceptions import (
    CensysAPIException,
    CensysException,
    CensysJSONDecodeException,
    CensysRateLimitExceededException,
    CensysTooManyRequestsException,
)

Fields = Optional[List[str]]


# Wrapper to make max_retries configurable at runtime
def _backoff_wrapper(method: Callable):
    @wraps(method)
    def _wrapper(self, *args, **kwargs):
        @backoff.on_exception(
            backoff.expo,
            (
                CensysRateLimitExceededException,
                CensysTooManyRequestsException,
            ),
            max_tries=self.max_retries,
        )
        def _impl():
            return method(self, *args, *kwargs)

        return _impl()

    return _wrapper


[docs]class CensysAPIBase: """ This is the base class for API queries. Args: url (str, optional): The URL to make API requests. timeout (int, optional): Timeout for API requests in seconds. user_agent (str, optional): Override User-Agent string. proxies (dict, optional): Configure HTTP proxies. Raises: CensysException: Base Exception Class for the Censys API. """ DEFAULT_TIMEOUT: int = 30 """Default API timeout.""" DEFAULT_USER_AGENT: str = "%s/%s" % (NAME, VERSION) """Default API user agent.""" DEFAULT_MAX_RETRIES: int = 10 """Default max number of API retries.""" def __init__(self, url: Optional[str] = None, **kwargs): # Get common request settings self.timeout = kwargs.get("timeout") or self.DEFAULT_TIMEOUT self.max_retries = kwargs.get("max_retries") or self.DEFAULT_MAX_RETRIES self._api_url = url or os.getenv("CENSYS_API_URL") if not self._api_url: raise CensysException("No API url configured.") # Create a session and set credentials self._session = requests.Session() proxies = kwargs.get("proxies") if proxies: if "http" in proxies.keys(): warnings.warn("HTTP proxies will not be used.") proxies.pop("http", None) self._session.proxies = proxies self._session.headers.update( { "accept": "application/json, */8", "User-Agent": " ".join( [ requests.utils.default_user_agent(), kwargs.get("user_agent") or kwargs.get("user_agent_identifier") or self.DEFAULT_USER_AGENT, ] ), } ) @staticmethod def _get_exception_class(_: Response) -> Type[CensysAPIException]: """ Maps HTTP status code or ASM error code to exception. Must be implemented by child class. Args: _ (Response): HTTP requests response object. Returns: Type[CensysAPIException]: Exception to raise. """ return CensysAPIException @_backoff_wrapper def _make_call( self, method: Callable, endpoint: str, args: Optional[dict] = None, data: Optional[Any] = None, ) -> dict: """ Wrapper functions for all our REST API calls checking for errors and decoding the responses. Args: method (Callable): Method to send HTTP request. endpoint (str): The path of API endpoint. args (dict, optional): URL args that are mapped to params. data (Any, optional): JSON data to serialize with request. Raises: CensysJSONDecodeException: The response is not valid JSON. Returns: dict: Results from an API request. """ if endpoint.startswith("/"): url = f"{self._api_url}{endpoint}" else: url = f"{self._api_url}/{endpoint}" request_kwargs = { "params": args or {}, "timeout": self.timeout, } if data: data = json.dumps(data) request_kwargs["data"] = data res = method(url, **request_kwargs) if res.status_code == 200: # Check for a returned json body try: return res.json() # Successful request returned no json body in response except ValueError: return {} try: json_data = res.json() message = json_data.get("error") or json_data["message"] const = json_data.get("error_type", None) error_code = json_data.get("errorCode", None) details = json_data.get("details", None) except (ValueError, json.decoder.JSONDecodeError) as error: # pragma: no cover message = ( f"Response from {res.url} is not valid JSON and cannot be decoded." ) raise CensysJSONDecodeException( status_code=res.status_code, message=message, body=res.text, const="badjson", ) from error except KeyError: # pragma: no cover message = None const = "unknown" details = "unknown" error_code = "unknown" censys_exception = self._get_exception_class(res) raise censys_exception( status_code=res.status_code, body=res.text, const=const, message=message, error_code=error_code, details=details, ) def _get(self, endpoint: str, args: Optional[dict] = None) -> dict: return self._make_call(self._session.get, endpoint, args) def _post( self, endpoint: str, args: Optional[dict] = None, data: Optional[dict] = None ) -> dict: return self._make_call(self._session.post, endpoint, args, data) def _put( self, endpoint: str, args: Optional[dict] = None, data: Optional[dict] = None ) -> dict: return self._make_call( self._session.put, endpoint, args, data ) # pragma: no cover def _delete(self, endpoint: str, args: Optional[dict] = None) -> dict: return self._make_call(self._session.delete, endpoint, args) # pragma: no cover