Source code for inlyse.cli

"""The module bundles all available clients for the INLYSE API.
"""

# Standard Library
import functools
import logging
import random
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Union

# Third Party Libraries
from requests.adapters import HTTPAdapter, Retry
from requests.compat import urljoin
from requests.utils import _parse_content_type_header  # type: ignore
from requests_toolbelt import sessions

from inlyse.exceptions import (  # noqa
    InlyseApiError,
    MaxRetriesExceeded,
    RateLimitExceeded,
)

logger = logging.getLogger(__name__)  # Init logger


[docs]class TimeoutHTTPAdapter(HTTPAdapter): def __init__(self, *args, **kwargs): self.timeout = kwargs.get("timeout", 5) del kwargs["timeout"] super().__init__(*args, **kwargs)
[docs] def send(self, request, **kwargs): timeout = kwargs.get("timeout") if timeout is None: kwargs["timeout"] = self.timeout return super().send(request, **kwargs)
[docs]@dataclass class InlyseResponse: """A response of the INLYSE API :param endpoint: The API endpoint :type endpoint: str :param status: The HTTP status code of the response :type status: int :param rate_limit: The rate limit status. Which includes the time for the next renewal and the remaining number of requests. :type rate_limit: dict or None :param content_type: The MIME type of the content. In example: `application/json`. :type content_type: tuple :param content: The content of the response. :type content: Any """ endpoint: str status: int rate_limit: Union[None, dict] content_type: tuple content: Any
def endpoint(path=None): def endpoint_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): self = args[0] if path: kwargs["path"] = path response = func(*args, **kwargs) quota = None if "x-ratelimit-remaining" in response.headers: quota = { "limit": int(response.headers["x-ratelimit-limit"]), "remaining": int( response.headers["x-ratelimit-remaining"] ), "reset": datetime.strptime( response.headers["x-ratelimit-reset"], "%d-%m-%Y %H:%M:%S", ).replace(tzinfo=timezone.utc), } content_type = _parse_content_type_header( response.headers["content-type"] ) if content_type[0] == "application/json": content = response.json() else: content = response.content return InlyseResponse( urljoin(self.url, path), response.status_code, quota, content_type, content, ) return wrapper return endpoint_decorator
[docs]class WebClient: """INLYSE API web client :param license_key: A license key for the INLYSE API :type license_key: str :param url: (optional) The URL of the INLYSE API. (Default: https://malware.ai) :type url: str :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. The excat same behavior like for python requests. (Default: (5, 60)) :type timeout: float or tuple **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> with WebClient("<your license key>") as client: ... client.ping() It is also possible to use the raw API GET or POST requests like this: .. sourcecode:: pycon >>> from inlyse import WebClient >>> with WebClient("<your license key>") as client: ... client.api.get("/ping") ... response = client.api.get("/api/stats") ... print(response.status_code) ... print(response.json()) 200 {'AnalysedFiles': 293, 'Traffic': 266161837} These methods will return a :class:`requests.Response` object. """ def __init__( self, license_key: str, url: str = "https://malware.ai", timeout: Union[float, tuple] = (5, 60), ) -> None: self.url = url self.license_key = license_key self.timeout = timeout self._api: Union[None, sessions.BaseUrlSession] = None self.api: sessions.BaseUrlSession def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() @property def api(self) -> sessions.BaseUrlSession: if not isinstance(self._api, sessions.BaseUrlSession): session = sessions.BaseUrlSession(base_url=self.url) retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 503], allowed_methods=["GET"], raise_on_status=False, ) adapter = TimeoutHTTPAdapter( timeout=self.timeout, max_retries=retry_strategy ) session.headers = {"Authorization": f"Bearer {self.license_key}"} session.mount("http://", adapter) session.mount("https://", adapter) self._api = session return self._api @api.setter def api(self, value) -> None: if value is None: self.close() def _wait( self, attempt: int, avg_time: float = 0.0, cap: int = 40, base: float = 0.1, ) -> float: """Internal wait method till an analysis is ready. :param attempt: The current number of attempts :type attempt: int :param avg_time: (optional) The average waiting time. (Default: 0.0) :type avg_time: float :param cap: (optional) The maximum backoff time in seconds. (Default: 40) :type cap: int :param base: (optional) The base backoff time in seconds. (Default: 0.1) :type base: float :return: The sleeping time. :rtype: float """ if attempt == 0: logger.debug("Waiting the average response time: %s", avg_time) time.sleep(avg_time) logger.debug(f"retries: {attempt}") back_offtime = random.uniform(0, min(cap, base * 2**attempt)) logger.debug(f"sleeping for {back_offtime} seconds") time.sleep(back_offtime) return back_offtime
[docs] @endpoint("/version") def version(self, *, path: str) -> InlyseResponse: """version() Get the version of the API * **API endpoint**: `/version` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/json` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the version request was `successful` it returns the HTTP status code `200` and the current version. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.version() >>> response InlyseResponse( endpoint='/version', status=200, rate_limit=None, content_type=('application/json', {'charset': 'UTF-8'}), content='1.7.4' ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.get(path)
[docs] @endpoint("/ping") def ping(self, *, path: str) -> InlyseResponse: """ping() Ping the INLYSE API * **API endpoint**: `/ping` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/json` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the ping request was `successful` it returns the HTTP status code `200` and the string `Pong`. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.ping() >>> response InlyseResponse( endpoint='/ping', status=200, rate_limit=None, content_type=('application/json', {'charset': 'UTF-8'}), content='Pong' ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.get(path)
[docs] @endpoint("/api/stats") def stats(self, *, path: str) -> InlyseResponse: """stats() Get the statistics of the used license key. It shows you how many files you have analysed and the traffic in Bytes. * **API endpoint**: `/api/stats` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/json` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was *successful* it returns the HTTP status code `200` and a dictionary with the number of analyzed files and the produced trafficfor the configured license key. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.stats() >>> response InlyseResponse( endpoint='/api/stats' status=200, rate_limit={ 'remaining': '14993', 'reset': datetime.datetime( 2023, 3, 20, 19, 45, 38, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={'AnalysedFiles': 201, 'Traffic': 205581688} ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.get(path)
[docs] @endpoint("/api/analysis/{id}/download") def download(self, analysis_id: str, *, path: str) -> InlyseResponse: """download(analysis_id) Download a file of a previous analysis. :param analysis_id: The UUID of an analysis. :type analysis_id: str .. warning:: Maybe the file has been already deleted. The file gets deleted after 30 seconds when the analysis has been completed. INLYSE saves only the analysis result permanently. * **API endpoint**: `/api/analysis/<id>/download` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/pdf`, `application/msexcel`, .. .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was *successful*, it returns the HTTP status code `200` and the requested file. * - 404 - Not Found - If the analysis with the given UUID does not exist or the corresponding file of the analysis has already been deleted the API returns the status code 404 and a short explaination about the error. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> download = client.download("0ef3822b-481d-4368-88ea-6a2417bb2dac") >>> download InlyseResponse( endpoint='/api/analysis/0ef3822b-481d-4368-88ea-6a2417bb2dac/download' status=200, rate_limit={ 'remaining': '14980', 'reset': datetime.datetime( 2023, 3, 20, 19, 45, 38, tzinfo=datetime.timezone.utc ) }, content_type=('application/pdf', {}), conent=b'%PDF-1.4...%%EOF' ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 return self.api.get(path.format(id=analysis_id))
[docs] @endpoint("/api/analysis/{id}/disarm") def disarm_analysis( self, analysis_id: str, *, path: str ) -> InlyseResponse: """disarm_analysis(analysis_id) Disarm a file of a previous analysis. :param analysis_id: The UUID of an analysis. :type analysis_id: str .. warning:: Maybe the file has been already deleted. The file gets deleted after 30 seconds when the analysis has been completed. INLYSE saves only the analysis result permanently. * **API endpoint**: `/api/analysis/<id>/disarm` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/pdf` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was *successful*, it returns the HTTP status code `200` and the disarmed PDF document. Microsoft Office documents get converted into PDF documents. * - 404 - Not Found - If the analysis with the given UUID does not exist or the corresponding file of the analysis has already been deleted the API returns the status code 404 and a short explaination about the error. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.disarm_analysis("0ef3822b-481d-4368-88ea-6a2417bb2dac") >>> response InlyseResponse( endpoint='/api/analysis/0ef3822b-481d-4368-88ea-6a2417bb2dac/disarm', status=200, rate_limit={ 'remaining': '14999', 'reset': datetime.datetime( 2023, 3, 21, 19, 44, 12, tzinfo=datetime.timezone.utc ) }, content_type=('application/pdf', {}), content=b'%PDF-1.7...%%EOF' ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 return self.api.get(path.format(id=analysis_id))
[docs] @endpoint("/api/files/") def upload_file( self, filename: str, content: bytes, *, path: str ) -> InlyseResponse: """upload(filename, content) Upload a local file. :param filename: The name of the file. :type filename: str :param content: The content of the file. :type content: bytes * **API endpoint**: `/api/files/` * **HTTP Method**: `POST` * **Response CONTENT-TYPE**: `application/json` **Allowed file types**: * Microsoft Office documents * PDF documents * ZIP files with exactly `1` Microsoft Office document or PDF document .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was successful, it returns the HTTP status code 200, the UUID of the analysis and an estimated time till the analysis is ready. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> with open("/tmp/javascript.pdf", "rb") as fp: ... response = client.upload_file(os.path.basename(fp.name), fp.read()) >>> response InlyseResponse( endpoint='/api/files/' status=200, rate_limit={ 'remaining': '14977', 'reset': datetime.datetime( 2023, 3, 20, 19, 45, 38, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'id': '4e1ae479-583b-4d52-a080-88adf6502364', 'EstimatedAnalysisTime': 5.83675 } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.post(path, files={"file": (filename, content)})
[docs] @endpoint("/api/files/url") def upload_url(self, url: str, *, path: str) -> InlyseResponse: """upload_url(url) Upload a remote file. At the moment we only support http URLs. :param url: The download URL of the file. :type url: str .. warning:: The URL needs to be public. It's not possible to send URLs which need authentication. * **API endpoint**: `/api/files/url` * **HTTP Method**: `POST` * **Response CONTENT-TYPE**: `application/json` **Allowed file types**: * Microsoft Office documents * PDF documents * ZIP files with exactly `1` Microsoft Office document or PDF document .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was successful, it returns the HTTP status code 200, the UUID of the analysis and an estimated time till the analysis is ready. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.upload_url("https://arxiv.org/pdf/2004.14471.pdf") >>> response InlyseResponse( endpoint='/api/files/url' status=200, rate_limit={ 'remaining': '14988', 'reset': datetime.datetime( 2023, 3, 20, 20, 47, 35, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'id': '1a13ba09-8487-4621-b2a3-b0ff460f7a9e', 'EstimatedAnalysisTime': 5.83675 } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.post(path, json={"url": url})
[docs] @endpoint("/api/files/owa") def upload_owa(self, url: str, token: str, *, path: str) -> InlyseResponse: """upload_owa(url, token) Upload an outlook attachment. :param url: The download URL of the file. :type url: str :param token: The token :type token: str * **API endpoint**: `/api/files/owa` * **HTTP Method**: `POST` * **Response CONTENT-TYPE**: `application/json` **Allowed file types**: * Microsoft Office documents * PDF documents * ZIP files with exactly `1` Microsoft Office document or PDF document .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was successful, it returns the HTTP status code 200, the UUID of the analysis and an estimated time till the analysis is ready. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.upload_owa( ... "https://attachments.office.net/owa/Max.Mustermann%40test.com/...", ... "iMWFWf1EZh8WM27tqXFlIa1QoNDfmjaZT0Xz7IyaDASBCUamUKcMKUSTVYJTOUm5...") >>> response InlyseResponse( endpoint='/api/files/owa', status=200, rate_limit={ 'remaining': '14988', 'reset': datetime.datetime( 2023, 3, 20, 20, 47, 35, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'id': '1a13ba09-8487-4621-b2a3-b0ff460f7a9e', 'EstimatedAnalysisTime': 5.83675 } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.post(path, json={"url": url, "token": token})
[docs] @endpoint("/api/files/disarm") def disarm_file( self, filename: str, content: bytes, *, path: str ) -> InlyseResponse: """disarm_file(filename, content) Disarm a local file. :param filename: The name of the file. :type filename: str :param content: The content of the file. :type content: bytes * **API endpoint**: `/api/files/disarm` * **HTTP Method**: `POST` * **Response CONTENT-TYPE**: `application/json` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was successful, it returns the HTTP status code 200 and the disarmed PDF document. Microsoft Office documents get converted into PDF documents. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> with open("/tmp/javascript.pdf", "rb") as fp: ... response = client.disarm_file(os.path.basename(fp.name), fp.read()) >>> response InlyseResponse( endpoint='/api/files/disarm', status=200, rate_limit={ 'remaining': '14999', 'reset': datetime.datetime( 2023, 3, 21, 19, 44, 12, tzinfo=datetime.timezone.utc ) }, content_type=('application/pdf', {}), content=b'%PDF-1.7...%%EOF' ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ return self.api.post(path, files={"file": (filename, content)})
def disarm(self, filename: str, content: bytes): response = self.disarm_file(filename, content) if response.status == 200: return response elif response.status == 429: raise RateLimitExceeded("Your rate limte exceeded.") else: raise InlyseApiError(response.content)
[docs] @endpoint("/api/analysis") def list_analyses( self, filter_: str = "all", *, path: str ) -> InlyseResponse: """list_analyses(filter) Get all analyses. :param filter_: Filter the analyses by finished, unfinished, error or all. :type filter_: str, optional * **API endpoint**: `/api/analysis` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/json` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was successful, it returns the HTTP status code 200 and a list of analysis UUIDs. * - 404 - Not Found - No analysis found * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. It's possible to filter the analyses by: * finished * unfinished * error * all [DEFAULT] If no filter is applied, you will get all analyses. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.list_analyses() >>> response InlyseResponse( endpoint='/api/analysis', status=200, rate_limit={ 'remaining': '14998', 'reset': datetime.datetime( 2023, 3, 20, 22, 16, 47, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), conent=[ '079211f4-c401-44fc-a846-01f7ff26e47f', '177a9179-c2d2-4d4a-976f-a8e1e4f496a0', '2f02e53e-5722-4fb1-b0a2-5ae053b2f00c', ... '36adec8e-d54f-401b-bc2f-1dd22ea9b099', ] ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ if filter_ not in ["all", "finished", "unfinished", "error"]: logger.warning("Unknown filter %s.", filter_) filter_ = "all" return self.api.get(path, params={"filter": filter_})
[docs] @endpoint("/api/analysis/{id}") def check(self, analysis_id: str, *, path: str) -> InlyseResponse: """check(analysis_id) Get an analysis by ID :param analysis_id: The UUID of the analysis. :type analysis_id: str * **API endpoint**: `/api/analysis/<id>` * **HTTP Method**: `GET` * **Response CONTENT-TYPE**: `application/json` .. list-table:: HTTP Response Codes :header-rows: 1 * - Status Code - Explaination * - 200 - OK - If the request was successful, it returns the HTTP status code 200 and the result of the analysis. * - 202 - Accepted - If the the analysis is not ready yet, it returns the HTTP status code 202. * - 404 - Not Found - If the analysis with the given UUID does not exist the API returns the status code 404 and a short explaination about the error. * - 401 - Unauthorized - If the request was not authorized. In example if the license key is not valid anymore. * - 429 - Too Many Requests - If the quota is exceed, it returns the HTTP status 429. **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.check("1a13ba09-8487-4621-b2a3-b0ff460f7a9e") >>> response InlyseResponse( endpoint='/api/analysis/1a13ba09-8487-4621-b2a3-b0ff460f7a9e', status=200, rate_limit={ 'remaining': '14997', 'reset': datetime.datetime( 2023, 3, 20, 22, 16, 47, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'ID': '1a13ba09-8487-4621-b2a3-b0ff460f7a9e', 'MD5': '7cdba4461284f8e5b5646ee0b502ec55', 'SHA1': 'e7ccdd6706a1ec8ac8d21e5b7d152d3e60acfe7c', 'SHA256': '55e2e5b4752c3c0626c70efa86041c7429a3322beed516bb35d96fa4edd9948b', 'SHA512': 'f068074c5b24133fc97febf5d534961a11cecbf0f17ac46246bdc4cb45d60b84d01ff2df77860d1db69cd37d198331fd9fbc7237e49f74a55af3672e532f6d45', 'Filename': '2004.14471.pdf', 'Size': 1354850, 'FileType': 'application/pdf', 'Label': 'benign', 'ScoreBenign': '0.9507668964520833', 'ScoreMalicious': '0.04923310354791669' } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 return self.api.get(path.format(id=analysis_id))
[docs] def get_analysis( self, analysis_id: str, estimated_time: float = 5.83675, max_retries: int = 3, ) -> InlyseResponse: """Fetch an analysis after uploading a file :param analysis_id: The UUID of the analysis. :type analysis_id: str :param estimated_time: (optional) The estimated time for the analysis. (Default: 5.83675) :type estimated_time: float :param max_retries: (optional) The maxium number of retries to fetch the analysis. (Default: 3) :type max_retries: int :raises RateLimitExceeded: The rate limit for this license key exceeded :raises MaxRetriesExceeded: The maximum retries to get the analysis exceeded :raises InlyseApiError: The INLYSE API returned an error. (e.g. The analysis id could not be found) **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.get_analysis("1a13ba09-8487-4621-b2a3-b0ff460f7a9e", 5.83675, 2) >>> response InlyseResponse( endpoint='/api/analysis/1a13ba09-8487-4621-b2a3-b0ff460f7a9e', status=200, rate_limit={ 'remaining': '14995', 'reset': datetime.datetime( 2023, 3, 20, 22, 16, 47, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'ID': '1a13ba09-8487-4621-b2a3-b0ff460f7a9e', 'MD5': '7cdba4461284f8e5b5646ee0b502ec55', 'SHA1': 'e7ccdd6706a1ec8ac8d21e5b7d152d3e60acfe7c', 'SHA256': '55e2e5b4752c3c0626c70efa86041c7429a3322beed516bb35d96fa4edd9948b', 'SHA512': 'f068074c5b24133fc97febf5d534961a11cecbf0f17ac46246bdc4cb45d60b84d01ff2df77860d1db69cd37d198331fd9fbc7237e49f74a55af3672e532f6d45', 'Filename': '2004.14471.pdf', 'Size': 1354850, 'FileType': 'application/pdf', 'Label': 'benign', 'ScoreBenign': '0.9507668964520833', 'ScoreMalicious': '0.04923310354791669' } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 retries = 0 while retries < max_retries: response = self.check(analysis_id) if response.status == 200: return response elif response.status == 202: self._wait(retries, avg_time=estimated_time) elif response.status == 429: raise RateLimitExceeded("Your rate limte exceeded.") else: raise InlyseApiError(response.content) retries += 1 raise MaxRetriesExceeded( "Max retries exceeded. Please increase the number of retries or the estimated waiting time." )
def _upload(self, upload_type, max_retries, **kwargs): """An upload wrapper method""" upload_methods = { "file": self.upload_file, "url": self.upload_url, "owa": self.upload_owa, } response = upload_methods[upload_type](**kwargs) if response.status == 429: raise RateLimitExceeded("Your rate limte exceeded.") elif response.status > 299 or response.status < 200: raise InlyseApiError(response.content) estimated_time = response.content["EstimatedAnalysisTime"] * 0.2 return self.get_analysis( response.content["id"], estimated_time, max_retries )
[docs] def scan_file( self, filename: str, content, max_retries: int = 15 ) -> InlyseResponse: """Uploads a local file and tries to fetch the result. :param filename: The name of the file. :type filename: str :param content: The content of the file. :type cotent: bytes :param max_retries: (optional) The maxium number of retries to fetch the analysis. (Default: 15) :type max_retries: int :raises RateLimitExceeded: The rate limit for this license key exceeded :raises MaxRetriesExceeded: The maximum retries to get the analysis exceeded :raises InlyseApiError: The INLYSE API returned an error. (e.g. The analysis id could not be found) **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> with open("/tmp/javascript.pdf", "rb") as fp: ... response = client.scan_file(os.path.basename(fp.name), fp.read()) >>> response InlyseResponse( endpoint='/api/analysis/8f238204-8540-4424-9872-822c46e39c05', status=200, rate_limit={ 'remaining': '14992', 'reset': datetime.datetime( 2023, 3, 20, 22, 16, 47, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'ID': '8f238204-8540-4424-9872-822c46e39c05', 'MD5': '55b47515feeeb8dae78763d662923787', 'SHA1': '5af3f43e3169e1e678e06b6372a60d9df22dc6d0', 'SHA256': '1fede472c1e339272f2ea27496ea059e86d6594b1ae93cbb6a486eeb118527e1', 'SHA512': 'a2fff650ba010c56b51ff4e9f3ee77292651428ad41d467f8c471b4c9091060a3dc64acea22ee875ec6f14abd3e018f944a92e87f4567b71fae05b2d80566880', 'Filename': 'javascript.pdf', 'Size': 990, 'FileType': 'application/pdf', 'Label': 'malicious', 'ScoreBenign': '0.0008773440468863303', 'ScoreMalicious': '0.9991226559531137', 'Action': 'DELETE;DISARM' } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 return self._upload( "file", max_retries, filename=filename, content=content )
[docs] def scan_url(self, url: str, max_retries: int = 15) -> InlyseResponse: """Uploads a remote file and tries to fetch the result. :param url: The download URL of the file. :type url: str :param max_retries: (optional) The maxium number of retries to fetch the analysis. (Default: 15) :type max_retries: int :raises RateLimitExceeded: The rate limit for this license key exceeded :raises MaxRetriesExceeded: The maximum retries to get the analysis exceeded :raises InlyseApiError: The INLYSE API returned an error. (e.g. The analysis id could not be found) **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.scan_url("https://arxiv.org/pdf/2004.14471.pdf") >>> response InlyseResponse( endpoint='/api/analysis/98e56af3-0f17-470b-bfcb-5ef7d4c83e07', status=200, rate_limit={ 'remaining': '14988', 'reset': datetime.datetime( 2023, 3, 20, 22, 16, 47, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'ID': '98e56af3-0f17-470b-bfcb-5ef7d4c83e07', 'MD5': '7cdba4461284f8e5b5646ee0b502ec55', 'SHA1': 'e7ccdd6706a1ec8ac8d21e5b7d152d3e60acfe7c', 'SHA256': '55e2e5b4752c3c0626c70efa86041c7429a3322beed516bb35d96fa4edd9948b', 'SHA512': 'f068074c5b24133fc97febf5d534961a11cecbf0f17ac46246bdc4cb45d60b84d01ff2df77860d1db69cd37d198331fd9fbc7237e49f74a55af3672e532f6d45', 'Filename': '2004.14471.pdf', 'Size': 1354850, 'FileType': 'application/pdf', 'Label': 'benign', 'ScoreBenign': '0.9507668964520833', 'ScoreMalicious': '0.04923310354791669' } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 return self._upload("url", max_retries, url=url)
[docs] def scan_owa( self, url: str, token: str, max_retries: int = 15 ) -> InlyseResponse: """Uploads a outlook attachment and tries to fetch the result. :param url: The download URL of the file. :type url: str :param max_retries: (optional) The maxium number of retries to fetch the analysis. (Default: 15) :type max_retries: int :raises RateLimitExceeded: The rate limit for this license key exceeded :raises MaxRetriesExceeded: The maximum retries to get the analysis exceeded :raises InlyseApiError: The INLYSE API returned an error. (e.g. The analysis id could not be found) **Example**: .. sourcecode:: pycon >>> from inlyse import WebClient >>> client = WebClient(<your license key>) >>> response = client.scan_owa( ... "https://attachments.office.net/owa/Max.Mustermann%40test.com/...", ... "iMWFWf1EZh8WM27tqXFlIa1QoNDfmjaZT0Xz7IyaDASBCUamUKcMKUSTVYJTOUm5..." ... ) >>> response InlyseResponse( endpoint='/api/analysis/98e56af3-0f17-470b-bfcb-5ef7d4c83e07', status=200, rate_limit={ 'remaining': '14988', 'reset': datetime.datetime( 2023, 3, 20, 22, 16, 47, tzinfo=datetime.timezone.utc ) }, content_type=('application/json', {'charset': 'UTF-8'}), content={ 'ID': '98e56af3-0f17-470b-bfcb-5ef7d4c83e07', 'MD5': '7cdba4461284f8e5b5646ee0b502ec55', 'SHA1': 'e7ccdd6706a1ec8ac8d21e5b7d152d3e60acfe7c', 'SHA256': '55e2e5b4752c3c0626c70efa86041c7429a3322beed516bb35d96fa4edd9948b', 'SHA512': 'f068074c5b24133fc97febf5d534961a11cecbf0f17ac46246bdc4cb45d60b84d01ff2df77860d1db69cd37d198331fd9fbc7237e49f74a55af3672e532f6d45', 'Filename': '2004.14471.pdf', 'Size': 1354850, 'FileType': 'application/pdf', 'Label': 'benign', 'ScoreBenign': '0.9507668964520833', 'ScoreMalicious': '0.04923310354791669' } ) >>> client.close() :return: Returns an :class:`inlyse.cli.InlyseResponse` object. :rtype: InlyseResponse """ # noqa: E501 return self._upload("owa", max_retries, url=url, token=token)
[docs] def close(self) -> None: """Close the HTTP session""" if isinstance(self._api, sessions.BaseUrlSession): self._api.close() self._api = None