Source code for censys.search.v2.certs

"""Interact with the Censys Search Cert API."""
from typing import List, Optional, Union

from .api import CensysSearchAPIv2


[docs] class CensysCerts(CensysSearchAPIv2): """Interacts with the Certs index. Please note that this class represents only the v2 API endpoints. Examples: Inits Censys Certs. >>> from censys.search import CensysCerts >>> c = CensysCerts() Search for hosts by sha256fp. >>> c.get_hosts_by_cert("fb444eb8e68437bae06232b9f5091bccff62a768ca09e92eb5c9c2cf9d17c426") ( [ { "ip": "string", "name": "string", "observed_at": "2021-08-02T14:56:38.711Z", "first_observed_at": "2021-08-02T14:56:38.711Z", } ], { "next": "nextCursorToken", }, ) """ INDEX_NAME = "certificates" """Name of Censys Index.""" def __init__( self, api_id: Optional[str] = None, api_secret: Optional[str] = None, **kwargs ): """Inits CensysCerts. See CensysSearchAPIv2 for additional arguments. Args: api_id (Optional[str], optional): API ID. Defaults to None. api_secret (Optional[str], optional): API Secret. Defaults to None. **kwargs: Arbitrary keyword arguments. """ super().__init__(api_id=api_id, api_secret=api_secret, **kwargs) self.bulk_path = f"/v2/{self.INDEX_NAME}/bulk"
[docs] def view(self, document_id: str, **kwargs) -> dict: """Fetches the certificate record for the specified SHA-256 fingerprint. Args: document_id (str): The SHA-256 fingerprint of the requested certificate. **kwargs: Arbitrary keyword arguments. Returns: dict: Certificate details. """ return self._get(self.view_path + document_id, args=kwargs)["result"]
[docs] def bulk_post(self, fingerprints: List[str]) -> List[dict]: """Fetches the certificate records for the specified SHA-256 fingerprints. Using the POST method allows for a larger number of fingerprints to be queried at once. Args: fingerprints (List[str]): List of certificate SHA256 fingerprints. Returns: dict: Certificate details. """ data = {"fingerprints": fingerprints} return self._post(self.bulk_path, data=data)["result"]
[docs] def bulk_get(self, fingerprints: List[str]) -> List[dict]: """Fetches the certificate records for the specified SHA-256 fingerprints. Using the GET method allows for a smaller number of fingerprints to be queried at once. Args: fingerprints (List[str]): List of certificate SHA256 fingerprints. Returns: dict: Certificate details. """ args = {"fingerprints": fingerprints} return self._get(self.bulk_path, args=args)["result"]
[docs] def bulk(self, fingerprints: List[str]) -> List[dict]: """Fetches the certificate records for the specified SHA-256 fingerprints. By default, this function uses the POST method, which allows for a larger number of fingerprints to be queried at once. If you wish to use the GET method, please use `CensysCerts.bulk_get` instead. Args: fingerprints (List[str]): List of certificate SHA256 fingerprints. Returns: dict: Certificate details. """ return self.bulk_post(fingerprints)
[docs] def bulk_view(self, fingerprints: List[str]) -> List[dict]: # type: ignore[override] """Fetches the certificate records for the specified SHA-256 fingerprints. By default, this function uses the POST method, which allows for a larger number of fingerprints to be queried at once. If you wish to use the GET method, please use `CensysCerts.bulk_get` instead. Args: fingerprints (List[str]): List of certificate SHA256 fingerprints. Returns: dict: Certificate details. """ return self.bulk_post(fingerprints)
[docs] def search_post_raw( self, query: str, per_page: int = 50, cursor: Optional[str] = None, fields: Optional[List[str]] = None, sort: Optional[Union[str, List[str]]] = None, **kwargs, ) -> dict: """Searches the Certs index using the POST method. Returns the raw response. Args: query (str): The query string to search for. per_page (int): The number of results to return per page. Defaults to 50. cursor (str, optional): Cursor token from the API response, which fetches the next page of results when added to the endpoint URL. fields (List[str], optional): Additional fields to return in the matched certificates outside of the default returned fields. sort (List[str], optional): A list of fields to sort on. By default, fields will be sorted in ascending order. **kwargs: Arbitrary keyword arguments. Returns: dict: Search results. """ return super().search_post_raw( query=query, per_page=per_page, cursor=cursor, fields=fields, sort=sort, **kwargs, )
[docs] def search_post( self, query: str, per_page: int = 50, cursor: Optional[str] = None, fields: Optional[List[str]] = None, sort: Optional[Union[str, List[str]]] = None, **kwargs, ) -> dict: """Searches the Certs index using the POST method. This method returns the `result` field of the raw response. If you wish to access the raw response, please use `CensysCerts.search_post_raw` instead. Args: query (str): The query string to search for. per_page (int): The number of results to return per page. Defaults to 50. cursor (str, optional): Cursor token from the API response, which fetches the next page of results when added to the endpoint URL. fields (List[str], optional): Additional fields to return in the matched certificates outside of the default returned fields. sort (List[str], optional): A list of fields to sort on. By default, fields will be sorted in ascending order. **kwargs: Arbitrary keyword arguments. Returns: dict: Search results. """ return super().search_post( query=query, per_page=per_page, cursor=cursor, fields=fields, sort=sort, **kwargs, )
[docs] def search_get( self, query: str, per_page: int = 50, cursor: Optional[str] = None, fields: Optional[List[str]] = None, sort: Optional[Union[str, List[str]]] = None, **kwargs, ) -> dict: """Searches the Certs index using the GET method. Args: query (str): The query string to search for. per_page (int): The number of results to return per page. Defaults to 50. cursor (str, optional): Cursor token from the API response, which fetches the next page of results when added to the endpoint URL. fields (List[str], optional): Additional fields to return in the matched certificates outside of the default returned fields. sort (List[str], optional): A list of fields to sort on. By default, fields will be sorted in ascending order. **kwargs: Arbitrary keyword arguments. Returns: dict: Search results. """ return super().search_get( query=query, per_page=per_page, cursor=cursor, fields=fields, sort=sort, **kwargs, )
[docs] def search( # type: ignore[override] self, query: str, per_page: int = 50, cursor: Optional[str] = None, pages: int = 1, fields: Optional[List[str]] = None, sort: Optional[Union[str, List[str]]] = None, **kwargs, ) -> CensysSearchAPIv2.Query: """Searches the Certs index. By default, this function uses the POST method, which allows for a larger number of fingerprints to be queried at once. If you wish to use the GET method, please use `CensysCerts.search_get` instead. Args: query (str): The query string to search for. per_page (int): The number of results to return per page. Defaults to 50. cursor (str, optional): Cursor token from the API response, which fetches the next page of results when added to the endpoint URL. pages (int): The number of pages to return. Defaults to 1. fields (List[str], optional): Additional fields to return in the matched certificates outside of the default returned fields. sort (List[str], optional): A list of fields to sort on. By default, fields will be sorted in ascending order. **kwargs: Additional keyword arguments to pass to the underlying HTTP request. Returns: Query: A query object that can be used to iterate over the search results. """ return super().search(query, per_page, cursor, pages, fields, sort, **kwargs)
[docs] def aggregate( self, query: str, field: str, num_buckets: int = 50, **kwargs ) -> dict: """Aggregates certificate records matching a specified query into buckets based on the given field. Args: query (str): The query string to search for. field (str): The field to aggregate on. num_buckets (int): The number of buckets to return. Defaults to 50. **kwargs: Additional keyword arguments to pass to the underlying HTTP request. Returns: dict: Aggregation results. """ args = {"q": query, "field": field, "num_buckets": num_buckets} args.update(kwargs) return self._get(self.aggregate_path, args=args)["result"]
[docs] def get_hosts_by_cert(self, fingerprint: str, cursor: Optional[str] = None) -> dict: """Returns a list of hosts which contain services presenting this certificate, including when the certificate was first observed. Args: fingerprint (str): The SHA-256 fingerprint of the requested certificate. cursor (str): Cursor token from the API response, which fetches the next page of hosts when added to the endpoint URL. Returns: dict: A list of hosts which contain services presenting this certificate. """ args = {"cursor": cursor} return self._get(self.view_path + fingerprint + "/hosts", args)["result"]
[docs] def list_certs_with_tag(self, tag_id: str) -> List[dict]: """Returns a list of certs which are tagged with the specified tag. Args: tag_id (str): The ID of the tag. Returns: List[dict]: A list of certs which are tagged with the specified tag. """ return self._list_documents_with_tag(tag_id, "certificates", "certs")