Skip to content

Reference

Meatie is a metaprogramming library for building HTTP REST API clients based on methods annotated with type hints.

AsyncContext

Bases: Generic[ResponseBodyType]

Stores context for processing an asynchronous HTTP request.

Source code in src/meatie/aio/descriptor.py
class AsyncContext(Generic[ResponseBodyType]):
    """Stores context for processing an asynchronous HTTP request."""

    def __init__(
        self,
        client: BaseAsyncClient,
        operators: list[AsyncOperator[ResponseBodyType]],
        request: Request,
    ) -> None:
        """Initializes the context.

        Args:
            client: HTTP client instance used for sending the HTTP request.
            operators: list of operators that should be applied on the HTTP request and its response.
            request: the HTTP request to be sent.
        """
        self.client = client
        self.__operators = operators
        self.__next_step = 0

        self.request = request
        self.response: Optional[AsyncResponse] = None

    async def proceed(self) -> ResponseBodyType:
        """One method call will apply one operator on the HTTP request.

        Operators are applied on HTTP requests according to order defined in the context object. HTTP responses are processed in the opposite order.
        """
        if self.__next_step >= len(self.__operators):
            raise RuntimeError("No more step to process request")

        current_step = self.__next_step
        self.__next_step += 1
        current_operator = self.__operators[current_step]
        try:
            return await current_operator(self)
        finally:
            self.__next_step = current_step

__init__(client, operators, request)

Initializes the context.

Parameters:

Name Type Description Default
client BaseAsyncClient

HTTP client instance used for sending the HTTP request.

required
operators list[AsyncOperator[ResponseBodyType]]

list of operators that should be applied on the HTTP request and its response.

required
request Request

the HTTP request to be sent.

required
Source code in src/meatie/aio/descriptor.py
def __init__(
    self,
    client: BaseAsyncClient,
    operators: list[AsyncOperator[ResponseBodyType]],
    request: Request,
) -> None:
    """Initializes the context.

    Args:
        client: HTTP client instance used for sending the HTTP request.
        operators: list of operators that should be applied on the HTTP request and its response.
        request: the HTTP request to be sent.
    """
    self.client = client
    self.__operators = operators
    self.__next_step = 0

    self.request = request
    self.response: Optional[AsyncResponse] = None

proceed() async

One method call will apply one operator on the HTTP request.

Operators are applied on HTTP requests according to order defined in the context object. HTTP responses are processed in the opposite order.

Source code in src/meatie/aio/descriptor.py
async def proceed(self) -> ResponseBodyType:
    """One method call will apply one operator on the HTTP request.

    Operators are applied on HTTP requests according to order defined in the context object. HTTP responses are processed in the opposite order.
    """
    if self.__next_step >= len(self.__operators):
        raise RuntimeError("No more step to process request")

    current_step = self.__next_step
    self.__next_step += 1
    current_operator = self.__operators[current_step]
    try:
        return await current_operator(self)
    finally:
        self.__next_step = current_step

AsyncEndpointDescriptor

Bases: Generic[PT, ResponseBodyType]

Class descriptor for calling HTTP endpoints asynchronously.

Source code in src/meatie/aio/descriptor.py
class AsyncEndpointDescriptor(Generic[PT, ResponseBodyType]):
    """Class descriptor for calling HTTP endpoints asynchronously."""

    def __init__(
        self,
        template: RequestTemplate[Any],
        response_decoder: TypeAdapter[ResponseBodyType],
    ) -> None:
        """Creates an asynchronous endpoint descriptor.

        Args:
            template: the template for building HTTP requests to send to the given endpoint.
            response_decoder: the adapter for decoding the HTTP responses returned from the endpoint.
        """
        self.template = template
        self.response_decoder = response_decoder
        self.get_json: Optional[Callable[[Any], Awaitable[Any]]] = None
        self.get_text: Optional[Callable[[Any], Awaitable[str]]] = None
        self.get_error: Optional[Callable[[AsyncResponse], Awaitable[Optional[Exception]]]] = None
        self.__operator_by_priority: dict[int, AsyncOperator[ResponseBodyType]] = {}

    def __set_name__(self, owner: type[object], name: str) -> None:
        if self.template.method is not None:
            return

        self.template.method = get_method(name)

    def register_operator(self, priority: int, operator: AsyncOperator[ResponseBodyType]) -> None:
        """Registers an operator to apply on an HTTP request or response.

        Meatie uses the following priorities for the built-in operators:
         * 20 - caching
         * 40 - retry
         * 60 - rate limiting
         * 80 - authentication

        Args:
            priority: the priority of the operator, operators are applied in ascending order of priority.
            operator: the operator to apply.
        """
        self.__operator_by_priority[priority] = operator

    @overload
    def __get__(self, instance: None, owner: None) -> Self: ...

    @overload
    def __get__(
        self, instance: BaseAsyncClient, owner: type[BaseAsyncClient]
    ) -> Callable[PT, Awaitable[ResponseBodyType]]: ...

    def __get__(
        self, instance: Optional[BaseAsyncClient], owner: Optional[type[object]] = None
    ) -> Union[Self, Callable[PT, Awaitable[ResponseBodyType]]]:
        if instance is None or owner is None:
            return self

        priority_operator_pair = list(self.__operator_by_priority.items())
        priority_operator_pair.sort()
        operators = [operator for _, operator in priority_operator_pair]

        return BoundAsyncEndpointDescriptor[PT, ResponseBodyType](
            instance,
            operators,
            self.template,
            self.response_decoder,
            self.get_json,
            self.get_text,
            self.get_error,
        )

__init__(template, response_decoder)

Creates an asynchronous endpoint descriptor.

Parameters:

Name Type Description Default
template RequestTemplate[Any]

the template for building HTTP requests to send to the given endpoint.

required
response_decoder TypeAdapter[ResponseBodyType]

the adapter for decoding the HTTP responses returned from the endpoint.

required
Source code in src/meatie/aio/descriptor.py
def __init__(
    self,
    template: RequestTemplate[Any],
    response_decoder: TypeAdapter[ResponseBodyType],
) -> None:
    """Creates an asynchronous endpoint descriptor.

    Args:
        template: the template for building HTTP requests to send to the given endpoint.
        response_decoder: the adapter for decoding the HTTP responses returned from the endpoint.
    """
    self.template = template
    self.response_decoder = response_decoder
    self.get_json: Optional[Callable[[Any], Awaitable[Any]]] = None
    self.get_text: Optional[Callable[[Any], Awaitable[str]]] = None
    self.get_error: Optional[Callable[[AsyncResponse], Awaitable[Optional[Exception]]]] = None
    self.__operator_by_priority: dict[int, AsyncOperator[ResponseBodyType]] = {}

register_operator(priority, operator)

Registers an operator to apply on an HTTP request or response.

Meatie uses the following priorities for the built-in operators
  • 20 - caching
  • 40 - retry
  • 60 - rate limiting
  • 80 - authentication

Parameters:

Name Type Description Default
priority int

the priority of the operator, operators are applied in ascending order of priority.

required
operator AsyncOperator[ResponseBodyType]

the operator to apply.

required
Source code in src/meatie/aio/descriptor.py
def register_operator(self, priority: int, operator: AsyncOperator[ResponseBodyType]) -> None:
    """Registers an operator to apply on an HTTP request or response.

    Meatie uses the following priorities for the built-in operators:
     * 20 - caching
     * 40 - retry
     * 60 - rate limiting
     * 80 - authentication

    Args:
        priority: the priority of the operator, operators are applied in ascending order of priority.
        operator: the operator to apply.
    """
    self.__operator_by_priority[priority] = operator

AsyncResponse

Bases: Protocol

Interface for an asynchronous HTTP response.

Source code in src/meatie/types.py
@runtime_checkable
class AsyncResponse(Protocol):
    """Interface for an asynchronous HTTP response."""

    @property
    def status(self) -> int:
        """Get HTTP status code.

        Returns:
            HTTP status code.
        """
        ...

    async def read(self) -> bytes:
        """Reads the response body and returns it as bytes without decoding.

        Returns:
            Response body as bytes.

        Raises:
            ResponseError: If an error occurs while reading the response body.
        """
        ...

    async def text(self) -> str:
        """Reads the response body and decodes it to a string using encoding declared in the Content-Type response header.

        Returns:
            Response body as string.

        Raises:
            ResponseError: If an error occurs while reading the response body.
        """
        ...

    async def json(self) -> Any:
        """Reads the response body and decodes it to JSON.

        Returns:
            Response body as JSON.

        Raises:
            ResponseError: If an error occurs while reading the response body.
            ParseResponseError: If an error occurs while parsing the response body.
        """
        ...

status property

Get HTTP status code.

Returns:

Type Description
int

HTTP status code.

json() async

Reads the response body and decodes it to JSON.

Returns:

Type Description
Any

Response body as JSON.

Raises:

Type Description
ResponseError

If an error occurs while reading the response body.

ParseResponseError

If an error occurs while parsing the response body.

Source code in src/meatie/types.py
async def json(self) -> Any:
    """Reads the response body and decodes it to JSON.

    Returns:
        Response body as JSON.

    Raises:
        ResponseError: If an error occurs while reading the response body.
        ParseResponseError: If an error occurs while parsing the response body.
    """
    ...

read() async

Reads the response body and returns it as bytes without decoding.

Returns:

Type Description
bytes

Response body as bytes.

Raises:

Type Description
ResponseError

If an error occurs while reading the response body.

Source code in src/meatie/types.py
async def read(self) -> bytes:
    """Reads the response body and returns it as bytes without decoding.

    Returns:
        Response body as bytes.

    Raises:
        ResponseError: If an error occurs while reading the response body.
    """
    ...

text() async

Reads the response body and decodes it to a string using encoding declared in the Content-Type response header.

Returns:

Type Description
str

Response body as string.

Raises:

Type Description
ResponseError

If an error occurs while reading the response body.

Source code in src/meatie/types.py
async def text(self) -> str:
    """Reads the response body and decodes it to a string using encoding declared in the Content-Type response header.

    Returns:
        Response body as string.

    Raises:
        ResponseError: If an error occurs while reading the response body.
    """
    ...

BaseAsyncClient

Base class for the integration with asynchronous HTTP client libraries.

Source code in src/meatie/aio/client.py
class BaseAsyncClient:
    """Base class for the integration with asynchronous HTTP client libraries."""

    SHARED_CACHE_MAX_SIZE: int = 1000
    shared_cache: Cache

    def __init__(
        self,
        local_cache: Optional[Cache] = None,
        limiter: Optional[Limiter] = None,
    ):
        """Creates a BaseAsyncClient.

        Args:
            local_cache: Cache implementation for storing the HTTP responses.
            limiter: Rate limiter used for throttling the rate of sending the HTTP requests.
        """
        self.local_cache = local_cache if local_cache is not None else Cache()
        self.limiter = limiter if limiter is not None else Limiter(Rate.max, INF)

    def __init_subclass__(cls, **kwargs: Any) -> None:
        cls.shared_cache = Cache(max_size=cls.SHARED_CACHE_MAX_SIZE)

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Any,
    ) -> None:
        return None

    async def authenticate(self, request: Request) -> None:
        """Method called by the Meatie before sending an HTTP request for an endpoint marked as private.

        The method could be used to set the Authorization header or sign the HTTP request and using API keys.

        Args:
            request: HTTP request object.
        """
        pass

    @abstractmethod
    async def send(self, request: Request) -> Any:
        """Sends an HTTP request using the underlying HTTP client library.

        Returns:
            HTTP response object from the underlying HTTP client library.
        """
        ...

__init__(local_cache=None, limiter=None)

Creates a BaseAsyncClient.

Parameters:

Name Type Description Default
local_cache Optional[Cache]

Cache implementation for storing the HTTP responses.

None
limiter Optional[Limiter]

Rate limiter used for throttling the rate of sending the HTTP requests.

None
Source code in src/meatie/aio/client.py
def __init__(
    self,
    local_cache: Optional[Cache] = None,
    limiter: Optional[Limiter] = None,
):
    """Creates a BaseAsyncClient.

    Args:
        local_cache: Cache implementation for storing the HTTP responses.
        limiter: Rate limiter used for throttling the rate of sending the HTTP requests.
    """
    self.local_cache = local_cache if local_cache is not None else Cache()
    self.limiter = limiter if limiter is not None else Limiter(Rate.max, INF)

authenticate(request) async

Method called by the Meatie before sending an HTTP request for an endpoint marked as private.

The method could be used to set the Authorization header or sign the HTTP request and using API keys.

Parameters:

Name Type Description Default
request Request

HTTP request object.

required
Source code in src/meatie/aio/client.py
async def authenticate(self, request: Request) -> None:
    """Method called by the Meatie before sending an HTTP request for an endpoint marked as private.

    The method could be used to set the Authorization header or sign the HTTP request and using API keys.

    Args:
        request: HTTP request object.
    """
    pass

send(request) abstractmethod async

Sends an HTTP request using the underlying HTTP client library.

Returns:

Type Description
Any

HTTP response object from the underlying HTTP client library.

Source code in src/meatie/aio/client.py
@abstractmethod
async def send(self, request: Request) -> Any:
    """Sends an HTTP request using the underlying HTTP client library.

    Returns:
        HTTP response object from the underlying HTTP client library.
    """
    ...

BaseClient

Base class for the integration with HTTP client libraries.

Source code in src/meatie/client.py
class BaseClient:
    """Base class for the integration with HTTP client libraries."""

    SHARED_CACHE_MAX_SIZE: int = 1000
    shared_cache: Cache

    def __init__(
        self,
        local_cache: Optional[Cache] = None,
        limiter: Optional[Limiter] = None,
    ):
        """Creates a BaseClient.

        Args:
            local_cache: Cache implementation for storing the HTTP responses.
            limiter: Rate limiter used for throttling the rate of sending the HTTP requests.
        """
        self.local_cache = local_cache if local_cache is not None else Cache()
        self.limiter = limiter if limiter is not None else Limiter(Rate.max, INF)

    def __init_subclass__(cls, **kwargs: Any) -> None:
        cls.shared_cache = Cache(max_size=cls.SHARED_CACHE_MAX_SIZE)

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Any,
    ) -> None:
        return None

    def authenticate(self, request: Request) -> None:
        """Method called by the Meatie before sending an HTTP request for an endpoint marked as private.

        The method could be used to set the Authorization header or sign the HTTP request and using API keys.

        Args:
            request: HTTP request object.
        """
        pass

    @abstractmethod
    def send(self, request: Request) -> Any:
        """Sends an HTTP request using the underlying HTTP client library.

        Returns:
            HTTP response object from the underlying HTTP client library.
        """
        ...

__init__(local_cache=None, limiter=None)

Creates a BaseClient.

Parameters:

Name Type Description Default
local_cache Optional[Cache]

Cache implementation for storing the HTTP responses.

None
limiter Optional[Limiter]

Rate limiter used for throttling the rate of sending the HTTP requests.

None
Source code in src/meatie/client.py
def __init__(
    self,
    local_cache: Optional[Cache] = None,
    limiter: Optional[Limiter] = None,
):
    """Creates a BaseClient.

    Args:
        local_cache: Cache implementation for storing the HTTP responses.
        limiter: Rate limiter used for throttling the rate of sending the HTTP requests.
    """
    self.local_cache = local_cache if local_cache is not None else Cache()
    self.limiter = limiter if limiter is not None else Limiter(Rate.max, INF)

authenticate(request)

Method called by the Meatie before sending an HTTP request for an endpoint marked as private.

The method could be used to set the Authorization header or sign the HTTP request and using API keys.

Parameters:

Name Type Description Default
request Request

HTTP request object.

required
Source code in src/meatie/client.py
def authenticate(self, request: Request) -> None:
    """Method called by the Meatie before sending an HTTP request for an endpoint marked as private.

    The method could be used to set the Authorization header or sign the HTTP request and using API keys.

    Args:
        request: HTTP request object.
    """
    pass

send(request) abstractmethod

Sends an HTTP request using the underlying HTTP client library.

Returns:

Type Description
Any

HTTP response object from the underlying HTTP client library.

Source code in src/meatie/client.py
@abstractmethod
def send(self, request: Request) -> Any:
    """Sends an HTTP request using the underlying HTTP client library.

    Returns:
        HTTP response object from the underlying HTTP client library.
    """
    ...

Context

Bases: Generic[ResponseBodyType]

Stores context for processing an HTTP request.

Source code in src/meatie/descriptor.py
class Context(Generic[ResponseBodyType]):
    """Stores context for processing an HTTP request."""

    def __init__(
        self,
        client: BaseClient,
        operators: list[Operator[ResponseBodyType]],
        request: Request,
    ) -> None:
        """Initializes the context.

        Args:
            client: HTTP client instance used for sending the HTTP request.
            operators: list of operators that should be applied on the HTTP request and its response.
            request: the HTTP request to be sent.
        """
        self.client = client
        self.__operators = operators
        self.__next_step = 0

        self.request = request
        self.response: Optional[Response] = None

    def proceed(self) -> ResponseBodyType:
        """One method call will apply one operator on the HTTP request.

        Operators are applied on HTTP requests according to order defined in the context object. HTTP responses are processed in the opposite order.
        """
        if self.__next_step >= len(self.__operators):
            raise RuntimeError("No more step to process request")

        current_step = self.__next_step
        self.__next_step += 1
        current_operator = self.__operators[current_step]
        try:
            result = current_operator(self)
            return result
        finally:
            self.__next_step = current_step

__init__(client, operators, request)

Initializes the context.

Parameters:

Name Type Description Default
client BaseClient

HTTP client instance used for sending the HTTP request.

required
operators list[Operator[ResponseBodyType]]

list of operators that should be applied on the HTTP request and its response.

required
request Request

the HTTP request to be sent.

required
Source code in src/meatie/descriptor.py
def __init__(
    self,
    client: BaseClient,
    operators: list[Operator[ResponseBodyType]],
    request: Request,
) -> None:
    """Initializes the context.

    Args:
        client: HTTP client instance used for sending the HTTP request.
        operators: list of operators that should be applied on the HTTP request and its response.
        request: the HTTP request to be sent.
    """
    self.client = client
    self.__operators = operators
    self.__next_step = 0

    self.request = request
    self.response: Optional[Response] = None

proceed()

One method call will apply one operator on the HTTP request.

Operators are applied on HTTP requests according to order defined in the context object. HTTP responses are processed in the opposite order.

Source code in src/meatie/descriptor.py
def proceed(self) -> ResponseBodyType:
    """One method call will apply one operator on the HTTP request.

    Operators are applied on HTTP requests according to order defined in the context object. HTTP responses are processed in the opposite order.
    """
    if self.__next_step >= len(self.__operators):
        raise RuntimeError("No more step to process request")

    current_step = self.__next_step
    self.__next_step += 1
    current_operator = self.__operators[current_step]
    try:
        result = current_operator(self)
        return result
    finally:
        self.__next_step = current_step

EndpointDescriptor

Bases: Generic[PT, ResponseBodyType]

Class descriptor for calling HTTP endpoints.

Source code in src/meatie/descriptor.py
class EndpointDescriptor(Generic[PT, ResponseBodyType]):
    """Class descriptor for calling HTTP endpoints."""

    def __init__(
        self,
        template: RequestTemplate[Any],
        response_decoder: TypeAdapter[ResponseBodyType],
    ) -> None:
        """Creates an endpoint descriptor.

        Args:
            template: the template for building HTTP requests to send to the given endpoint.
            response_decoder: the adapter for decoding the HTTP responses returned from the endpoint.
        """
        self.template = template
        self.response_decoder = response_decoder
        self.get_json: Optional[Callable[[Any], Any]] = None
        self.get_text: Optional[Callable[[Any], str]] = None
        self.get_error: Optional[Callable[[Response], Optional[Exception]]] = None
        self.__operator_by_priority: dict[int, Operator[ResponseBodyType]] = {}

    def __set_name__(self, owner: type[object], name: str) -> None:
        if self.template.method is not None:
            return

        self.template.method = get_method(name)

    def register_operator(self, priority: int, operator: Operator[ResponseBodyType]) -> None:
        """Registers an operator to apply on an HTTP request or response.

        Meatie uses the following priorities for the built-in operators:
         * 20 - caching
         * 40 - retry
         * 60 - rate limiting
         * 80 - authentication

        Args:
            priority: the priority of the operator, operators are applied in ascending order of priority.
            operator: the operator to apply.
        """
        self.__operator_by_priority[priority] = operator

    @overload
    def __get__(self, instance: None, owner: None) -> Self: ...

    @overload
    def __get__(self, instance: BaseClient, owner: type[BaseClient]) -> Callable[PT, ResponseBodyType]: ...

    def __get__(
        self, instance: Optional[BaseClient], owner: Optional[type[object]] = None
    ) -> Union[Self, Callable[PT, ResponseBodyType]]:
        if instance is None or owner is None:
            return self

        priority_operator_pair = list(self.__operator_by_priority.items())
        priority_operator_pair.sort()
        operators = [operator for _, operator in priority_operator_pair]

        return BoundEndpointDescriptor[PT, ResponseBodyType](
            instance,
            operators,
            self.template,
            self.response_decoder,
            self.get_json,
            self.get_text,
            self.get_error,
        )

__init__(template, response_decoder)

Creates an endpoint descriptor.

Parameters:

Name Type Description Default
template RequestTemplate[Any]

the template for building HTTP requests to send to the given endpoint.

required
response_decoder TypeAdapter[ResponseBodyType]

the adapter for decoding the HTTP responses returned from the endpoint.

required
Source code in src/meatie/descriptor.py
def __init__(
    self,
    template: RequestTemplate[Any],
    response_decoder: TypeAdapter[ResponseBodyType],
) -> None:
    """Creates an endpoint descriptor.

    Args:
        template: the template for building HTTP requests to send to the given endpoint.
        response_decoder: the adapter for decoding the HTTP responses returned from the endpoint.
    """
    self.template = template
    self.response_decoder = response_decoder
    self.get_json: Optional[Callable[[Any], Any]] = None
    self.get_text: Optional[Callable[[Any], str]] = None
    self.get_error: Optional[Callable[[Response], Optional[Exception]]] = None
    self.__operator_by_priority: dict[int, Operator[ResponseBodyType]] = {}

register_operator(priority, operator)

Registers an operator to apply on an HTTP request or response.

Meatie uses the following priorities for the built-in operators
  • 20 - caching
  • 40 - retry
  • 60 - rate limiting
  • 80 - authentication

Parameters:

Name Type Description Default
priority int

the priority of the operator, operators are applied in ascending order of priority.

required
operator Operator[ResponseBodyType]

the operator to apply.

required
Source code in src/meatie/descriptor.py
def register_operator(self, priority: int, operator: Operator[ResponseBodyType]) -> None:
    """Registers an operator to apply on an HTTP request or response.

    Meatie uses the following priorities for the built-in operators:
     * 20 - caching
     * 40 - retry
     * 60 - rate limiting
     * 80 - authentication

    Args:
        priority: the priority of the operator, operators are applied in ascending order of priority.
        operator: the operator to apply.
    """
    self.__operator_by_priority[priority] = operator

HttpStatusError

Bases: ResponseError

Dedicated for handling HTTP status errors. Currently, it is not raised by the Meatie library.

Source code in src/meatie/error.py
class HttpStatusError(ResponseError):
    """Dedicated for handling HTTP status errors. Currently, it is not raised by the Meatie library."""

    ...

MeatieError

Bases: Exception

Base class for all Meatie exceptions.

Check __cause__ for the original exception raised by the underlying HTTP client library.

Source code in src/meatie/error.py
class MeatieError(Exception):
    """Base class for all Meatie exceptions.

    Check `__cause__` for the original exception raised by the underlying HTTP client library.
    """

    ...

ParseResponseError

Bases: ResponseError

Raised when the response body cannot be parsed to a JSON object.

Source code in src/meatie/error.py
class ParseResponseError(ResponseError):
    """Raised when the response body cannot be parsed to a JSON object."""

    def __init__(
        self,
        text: str,
        response: Union[Response, AsyncResponse],
    ) -> None:
        """Creates a ParseResponseError.

        Args:
            text: response body in the text format.
            response: HTTP response object.
        """
        super().__init__(response)

        self.text = text

__init__(text, response)

Creates a ParseResponseError.

Parameters:

Name Type Description Default
text str

response body in the text format.

required
response Union[Response, AsyncResponse]

HTTP response object.

required
Source code in src/meatie/error.py
def __init__(
    self,
    text: str,
    response: Union[Response, AsyncResponse],
) -> None:
    """Creates a ParseResponseError.

    Args:
        text: response body in the text format.
        response: HTTP response object.
    """
    super().__init__(response)

    self.text = text

ProxyError

Bases: TransportError

Raised when a proxy error occurs.

Request failed due to a proxy error, i.e., the proxy server is refusing connections, the proxy server does not accept CONNECT requests.

Source code in src/meatie/error.py
class ProxyError(TransportError):
    """Raised when a proxy error occurs.

    Request failed due to a proxy error, i.e., the proxy server is refusing connections, the proxy server does not
     accept CONNECT requests.
    """

    ...

RateLimitExceeded

Bases: MeatieError

Deprecated: Use HttpStatusError instead.

Scheduled for removal.

Source code in src/meatie/error.py
@deprecated("Use `HttpStatusError` instead.")
class RateLimitExceeded(MeatieError):
    """Deprecated: Use `HttpStatusError` instead.

    Scheduled for removal.
    """

    ...

Request dataclass

Specification of an HTTP request.

Value types used for headers and query parameters must be supported by the underlying HTTP client library.

Source code in src/meatie/types.py
@dataclass()
class Request:
    """Specification of an HTTP request.

    Value types used for headers and query parameters must be supported by the underlying HTTP client library.
    """

    method: Method
    path: str
    params: dict[str, Any]
    headers: dict[str, Any]
    data: Optional[Union[str, bytes]] = None
    json: Any = None

RequestError

Bases: MeatieError

Raised when the request cannot be sent by the underlying HTTP client library, i.e., invalid URL, unsupported protocol.

There is no point in retrying the operation.

Source code in src/meatie/error.py
class RequestError(MeatieError):
    """Raised when the request cannot be sent by the underlying HTTP client library, i.e., invalid URL, unsupported protocol.

    There is no point in retrying the operation.
    """

    ...

Response

Bases: Protocol

Interface for an HTTP response.

Source code in src/meatie/types.py
@runtime_checkable
class Response(Protocol):
    """Interface for an HTTP response."""

    @property
    def status(self) -> int:
        """Get HTTP status code.

        Returns:
            HTTP status code.
        """
        ...

    def read(self) -> bytes:
        """Reads the response body and returns it as bytes without decoding.

        Returns:
            Response body as bytes.

        Raises:
            ResponseError: If an error occurs while reading the response body.
        """
        ...

    def text(self) -> str:
        """Reads the response body and decodes it to a string using encoding declared in the Content-Type response header.

        Returns:
            Response body as string.

        Raises:
            ResponseError: If an error occurs while reading the response body.
        """
        ...

    def json(self) -> Any:
        """Reads the response body and decodes it to JSON.

        Returns:
            Response body as JSON.

        Raises:
            ResponseError: If an error occurs while reading the response body.
            ParseResponseError: If an error occurs while parsing the response body.
        """
        ...

status property

Get HTTP status code.

Returns:

Type Description
int

HTTP status code.

json()

Reads the response body and decodes it to JSON.

Returns:

Type Description
Any

Response body as JSON.

Raises:

Type Description
ResponseError

If an error occurs while reading the response body.

ParseResponseError

If an error occurs while parsing the response body.

Source code in src/meatie/types.py
def json(self) -> Any:
    """Reads the response body and decodes it to JSON.

    Returns:
        Response body as JSON.

    Raises:
        ResponseError: If an error occurs while reading the response body.
        ParseResponseError: If an error occurs while parsing the response body.
    """
    ...

read()

Reads the response body and returns it as bytes without decoding.

Returns:

Type Description
bytes

Response body as bytes.

Raises:

Type Description
ResponseError

If an error occurs while reading the response body.

Source code in src/meatie/types.py
def read(self) -> bytes:
    """Reads the response body and returns it as bytes without decoding.

    Returns:
        Response body as bytes.

    Raises:
        ResponseError: If an error occurs while reading the response body.
    """
    ...

text()

Reads the response body and decodes it to a string using encoding declared in the Content-Type response header.

Returns:

Type Description
str

Response body as string.

Raises:

Type Description
ResponseError

If an error occurs while reading the response body.

Source code in src/meatie/types.py
def text(self) -> str:
    """Reads the response body and decodes it to a string using encoding declared in the Content-Type response header.

    Returns:
        Response body as string.

    Raises:
        ResponseError: If an error occurs while reading the response body.
    """
    ...

ResponseError

Bases: MeatieError

Raised when the response body cannot be read or decoded to a string.

Source code in src/meatie/error.py
class ResponseError(MeatieError):
    """Raised when the response body cannot be read or decoded to a string."""

    def __init__(self, response: Union[Response, AsyncResponse]) -> None:
        """Creates a ResponseError.

        Args:
            response: HTTP response object.
        """
        self.response = response

__init__(response)

Creates a ResponseError.

Parameters:

Name Type Description Default
response Union[Response, AsyncResponse]

HTTP response object.

required
Source code in src/meatie/error.py
def __init__(self, response: Union[Response, AsyncResponse]) -> None:
    """Creates a ResponseError.

    Args:
        response: HTTP response object.
    """
    self.response = response

RetryError

Bases: MeatieError

Raised when all retry attempts have been exhausted.

Source code in src/meatie/error.py
class RetryError(MeatieError):
    """Raised when all retry attempts have been exhausted."""

    ...

ServerError

Bases: TransportError

Raised when a server or a network error occurs.

HTTP request failed because of the server-side error i.e., server violated HTTP protocol by returning a corrupted response, server disconnected the connection prematurely, server certificate is invalid, cannot establish a connection with the server.

Source code in src/meatie/error.py
class ServerError(TransportError):
    """Raised when a server or a network error occurs.

    HTTP request failed because of the server-side error i.e., server violated HTTP protocol by returning a corrupted response,
    server disconnected the connection prematurely, server certificate is invalid, cannot establish a connection with the server.
    """

    ...

Timeout

Bases: ServerError

Raised when a request times out, i.e., connection timeout, read timeout, write timeout.

Source code in src/meatie/error.py
class Timeout(ServerError):
    """Raised when a request times out, i.e., connection timeout, read timeout, write timeout."""

    ...

TransportError

Bases: MeatieError

Raised in response to a transport error, i.e., too many redirects, a protocol error, a network error.

Source code in src/meatie/error.py
class TransportError(MeatieError):
    """Raised in response to a transport error, i.e., too many redirects, a protocol error, a network error."""

    ...

endpoint(path, *args, method=None)

Class descriptor for decorating methods that represent API endpoints.

Inspects the method signature to create an endpoint descriptor that can be used to make HTTP requests.

Parameters:

Name Type Description Default
path str

URL path template. It should start with /. Path parameters should be surrounded by parentheses.

required
args Any

options to customize the endpoint behaviour, such as caching, rate limiting, retries, and authentication.

()
method Optional[Method]

HTTP method for making the request. Inferred from the method name by default.

None

Returns:

Type Description
Callable[[Callable[PT, T]], Callable[PT, T]]

A decorator that preserves the callable signature of the decorated method.

Callable[[Callable[PT, T]], Callable[PT, T]]

For async methods, returns a callable with Awaitable return type.

Callable[[Callable[PT, T]], Callable[PT, T]]

For sync methods, returns a callable with direct return type.

Examples:

from typing import Annotated

from aiohttp import ClientSession
from meatie import api_ref, endpoint
from meatie_aiohttp import Client

class JsonPlaceholderClient(Client):
    def __init__(self) -> None:
        super().__init__(ClientSession(base_url="https://jsonplaceholder.typicode.com"))

    @endpoint("/todos")
    async def get_todos(self, user_id: Annotated[int, api_ref("userId")] = None) -> list[dict]: ...

    @endpoint("/users/{user_id}/todos")
    async def get_todos_by_user(self, user_id: int) -> list[dict]: ...
Source code in src/meatie/endpoint.py
def endpoint(
    path: str,
    *args: Any,
    method: Optional[Method] = None,
) -> Callable[[Callable[PT, T]], Callable[PT, T]]:
    """Class descriptor for decorating methods that represent API endpoints.

    Inspects the method signature to create an endpoint descriptor that can be used to make HTTP requests.

    Parameters:
        path: URL path template. It should start with `/`. Path parameters should be surrounded by parentheses.
        args: options to customize the endpoint behaviour, such as caching, rate limiting, retries, and authentication.
        method: HTTP method for making the request. Inferred from the method name by default.

    Returns:
        A decorator that preserves the callable signature of the decorated method.
        For async methods, returns a callable with Awaitable return type.
        For sync methods, returns a callable with direct return type.

    Examples:
        ```python
        from typing import Annotated

        from aiohttp import ClientSession
        from meatie import api_ref, endpoint
        from meatie_aiohttp import Client

        class JsonPlaceholderClient(Client):
            def __init__(self) -> None:
                super().__init__(ClientSession(base_url="https://jsonplaceholder.typicode.com"))

            @endpoint("/todos")
            async def get_todos(self, user_id: Annotated[int, api_ref("userId")] = None) -> list[dict]: ...

            @endpoint("/users/{user_id}/todos")
            async def get_todos_by_user(self, user_id: int) -> list[dict]: ...
        ```
    """

    def class_descriptor(func: Callable[PT, T]) -> Callable[PT, T]:
        path_template = PathTemplate.from_string(path)

        signature = inspect.signature(func)
        type_hints = get_type_hints(func, include_extras=True)
        request_template: RequestTemplate[Any] = RequestTemplate.from_signature(
            signature, type_hints, path_template, method
        )

        return_type = type_hints["return"]
        response_decoder: TypeAdapter[T] = get_adapter(return_type)

        is_coroutine = inspect.iscoroutinefunction(func)
        descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]]
        if is_coroutine:
            descriptor = AsyncEndpointDescriptor[PT, T](request_template, response_decoder)
        else:
            descriptor = EndpointDescriptor[PT, T](request_template, response_decoder)

        for option in args:
            option(descriptor)

        # Cast the descriptor to the expected callable type.
        # The descriptor protocol ensures this works at runtime: when accessed on an instance,
        # the descriptor's __get__ method returns a bound callable with the correct signature.
        return cast(Callable[PT, T], descriptor)

    return class_descriptor

ApiReference

ApiReference stores additional settings that customize handling of a parameter in the HTTP request.

Source code in src/meatie/api_reference.py
class ApiReference:
    """ApiReference stores additional settings that customize handling of a parameter in the HTTP request."""

    __slots__ = ("name", "fmt", "unwrap")

    def __init__(
        self,
        name: Optional[str] = None,
        fmt: Optional[Callable[[Any], Any]] = None,
        unwrap: Optional[Callable[[Any], dict[str, Any]]] = None,
    ) -> None:
        """Create an ApiReference.

        Args:
            name: the name of the query parameter in the HTTP request, the name of the Python parameter is used as default.
            fmt: conversion function to apply on the parameter value before sending the HTTP request
            unwrap: conversion function to apply on the parameter value before sending the HTTP request. In contrast to the fmt function, which produces a single value, the unwrap function returns a dictionary of key value pairs.
        """
        self.name = name
        self.fmt = fmt
        self.unwrap = unwrap

    def __hash__(self) -> int:
        return hash(self.name)

    def __eq__(self, other: Any) -> bool:
        # Python interpreter seems to reuse annotations, for that reason we also need to include the formatter
        if isinstance(other, ApiReference):
            return self.name == other.name and self.fmt == other.fmt and self.unwrap == other.unwrap
        return False

    @classmethod
    def from_signature(cls, parameter: inspect.Parameter, resolved_annotation: Optional[Any] = None) -> Self:
        """Create an ApiReference from the Python method parameter.

        Args:
            parameter: The parameter from the function signature.
            resolved_annotation: The resolved type hint (from get_type_hints with include_extras=True).
                If provided, this is tried first, then falls back to parameter.annotation.
        """
        # Try resolved annotation first (handles stringified annotations from future imports)
        # If not found, fall back to parameter.annotation (handles local functions in tests)
        for annotation in [resolved_annotation, parameter.annotation]:
            if annotation is None:
                continue
            for arg in get_args(annotation):
                if isinstance(arg, cls):
                    if arg.name is None:
                        return cls(name=parameter.name, fmt=arg.fmt, unwrap=arg.unwrap)
                    return arg
        return cls(name=parameter.name, fmt=None, unwrap=None)

__init__(name=None, fmt=None, unwrap=None)

Create an ApiReference.

Parameters:

Name Type Description Default
name Optional[str]

the name of the query parameter in the HTTP request, the name of the Python parameter is used as default.

None
fmt Optional[Callable[[Any], Any]]

conversion function to apply on the parameter value before sending the HTTP request

None
unwrap Optional[Callable[[Any], dict[str, Any]]]

conversion function to apply on the parameter value before sending the HTTP request. In contrast to the fmt function, which produces a single value, the unwrap function returns a dictionary of key value pairs.

None
Source code in src/meatie/api_reference.py
def __init__(
    self,
    name: Optional[str] = None,
    fmt: Optional[Callable[[Any], Any]] = None,
    unwrap: Optional[Callable[[Any], dict[str, Any]]] = None,
) -> None:
    """Create an ApiReference.

    Args:
        name: the name of the query parameter in the HTTP request, the name of the Python parameter is used as default.
        fmt: conversion function to apply on the parameter value before sending the HTTP request
        unwrap: conversion function to apply on the parameter value before sending the HTTP request. In contrast to the fmt function, which produces a single value, the unwrap function returns a dictionary of key value pairs.
    """
    self.name = name
    self.fmt = fmt
    self.unwrap = unwrap

from_signature(parameter, resolved_annotation=None) classmethod

Create an ApiReference from the Python method parameter.

Parameters:

Name Type Description Default
parameter Parameter

The parameter from the function signature.

required
resolved_annotation Optional[Any]

The resolved type hint (from get_type_hints with include_extras=True). If provided, this is tried first, then falls back to parameter.annotation.

None
Source code in src/meatie/api_reference.py
@classmethod
def from_signature(cls, parameter: inspect.Parameter, resolved_annotation: Optional[Any] = None) -> Self:
    """Create an ApiReference from the Python method parameter.

    Args:
        parameter: The parameter from the function signature.
        resolved_annotation: The resolved type hint (from get_type_hints with include_extras=True).
            If provided, this is tried first, then falls back to parameter.annotation.
    """
    # Try resolved annotation first (handles stringified annotations from future imports)
    # If not found, fall back to parameter.annotation (handles local functions in tests)
    for annotation in [resolved_annotation, parameter.annotation]:
        if annotation is None:
            continue
        for arg in get_args(annotation):
            if isinstance(arg, cls):
                if arg.name is None:
                    return cls(name=parameter.name, fmt=arg.fmt, unwrap=arg.unwrap)
                return arg
    return cls(name=parameter.name, fmt=None, unwrap=None)

BodyOption

Customize handling of HTTP response body, such as text decoding, parsing JSON and detecting errors.

Source code in src/meatie/option/body_option.py
class BodyOption:
    """Customize handling of HTTP response body, such as text decoding, parsing JSON and detecting errors."""

    __slots__ = ("json", "text", "error")

    def __init__(
        self,
        json: Optional[Union[Callable[[Any], Any], Callable[[Any], Awaitable[Any]]]] = None,
        text: Optional[Union[Callable[[Any], str], Callable[[Any], Awaitable[str]]]] = None,
        error: Optional[
            Union[
                Callable[[Response], Optional[Exception]],
                Callable[[AsyncResponse], Awaitable[Optional[Exception]]],
            ]
        ] = None,
    ) -> None:
        """Creates a new body option.

        Parameters:
            json: function to parse JSON from the HTTP response body. The default is to rely on the behaviour of the HTTP client library.
            text: function to decode text from the HTTP response body. The default is to rely on the behaviour of the HTTP client library.
            error: function to detect an error in the HTTP response. The default is to do nothing.
        """
        self.json = json
        self.text = text
        self.error = error

    def __call__(
        self,
        descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
    ) -> None:
        """Apply the body option to the endpoint descriptor."""
        descriptor.get_text = self.text
        descriptor.get_json = self.json
        descriptor.get_error = self.error

__call__(descriptor)

Apply the body option to the endpoint descriptor.

Source code in src/meatie/option/body_option.py
def __call__(
    self,
    descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
) -> None:
    """Apply the body option to the endpoint descriptor."""
    descriptor.get_text = self.text
    descriptor.get_json = self.json
    descriptor.get_error = self.error

__init__(json=None, text=None, error=None)

Creates a new body option.

Parameters:

Name Type Description Default
json Optional[Union[Callable[[Any], Any], Callable[[Any], Awaitable[Any]]]]

function to parse JSON from the HTTP response body. The default is to rely on the behaviour of the HTTP client library.

None
text Optional[Union[Callable[[Any], str], Callable[[Any], Awaitable[str]]]]

function to decode text from the HTTP response body. The default is to rely on the behaviour of the HTTP client library.

None
error Optional[Union[Callable[[Response], Optional[Exception]], Callable[[AsyncResponse], Awaitable[Optional[Exception]]]]]

function to detect an error in the HTTP response. The default is to do nothing.

None
Source code in src/meatie/option/body_option.py
def __init__(
    self,
    json: Optional[Union[Callable[[Any], Any], Callable[[Any], Awaitable[Any]]]] = None,
    text: Optional[Union[Callable[[Any], str], Callable[[Any], Awaitable[str]]]] = None,
    error: Optional[
        Union[
            Callable[[Response], Optional[Exception]],
            Callable[[AsyncResponse], Awaitable[Optional[Exception]]],
        ]
    ] = None,
) -> None:
    """Creates a new body option.

    Parameters:
        json: function to parse JSON from the HTTP response body. The default is to rely on the behaviour of the HTTP client library.
        text: function to decode text from the HTTP response body. The default is to rely on the behaviour of the HTTP client library.
        error: function to detect an error in the HTTP response. The default is to do nothing.
    """
    self.json = json
    self.text = text
    self.error = error

BaseAsyncOperator

Bases: Generic[T]

Base class for asynchronous cache operators. Saves the value returned from the endpoint in cache.

Source code in src/meatie/option/cache_option.py
class BaseAsyncOperator(Generic[T]):
    """Base class for asynchronous cache operators. Saves the value returned from the endpoint in cache."""

    def __init__(self, ttl: Duration) -> None:
        self.ttl = ttl

    async def __call__(self, ctx: AsyncContext[T]) -> T:
        storage = self._storage(ctx)
        key = get_key(ctx.request)
        value_opt = storage.load(key)
        if value_opt is not None:
            return value_opt

        value = await ctx.proceed()
        storage.store(key, value, self.ttl)
        return value

    @abc.abstractmethod
    def _storage(self, ctx: AsyncContext[T]) -> Cache:
        """Returns: the cache storage to use."""
        ...

BaseOperator

Bases: Generic[T]

Base class for cache operators. Saves the value returned from the endpoint in cache.

Source code in src/meatie/option/cache_option.py
class BaseOperator(Generic[T]):
    """Base class for cache operators. Saves the value returned from the endpoint in cache."""

    def __init__(self, ttl: Duration) -> None:
        self.ttl = ttl

    def __call__(self, ctx: Context[T]) -> T:
        storage = self._storage(ctx)
        key = get_key(ctx.request)
        value_opt = storage.load(key)
        if value_opt is not None:
            return value_opt

        value = ctx.proceed()
        storage.store(key, value, self.ttl)
        return value

    @abc.abstractmethod
    def _storage(self, ctx: Context[T]) -> Cache:
        """Returns: the cache storage to use."""
        ...

CacheOption

Configure caching of endpoint call results.

Source code in src/meatie/option/cache_option.py
class CacheOption:
    """Configure caching of endpoint call results."""

    def __init__(self, ttl: Duration, shared: bool = False) -> None:
        """Creates a new cache option.

        Parameters:
            ttl: the time-to-live of the cache entry in seconds
            shared: if set to False (default) the cache entry will be stored in the local cache owned by the client instance. Records cached by another client instance will not be visible.
                Otherwise, if set to True, all client that are instances of the same Python class will share the same cache.
        """
        self.ttl = ttl
        self.shared = shared

    def __call__(
        self,
        descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
    ) -> None:
        """Apply the cache option to the endpoint descriptor."""
        if isinstance(descriptor, EndpointDescriptor):
            return self.__sync_descriptor(descriptor)
        return self.__async_descriptor(descriptor)

    @property
    def priority(self) -> int:
        """Returns: the priority of the cache operator."""
        return 20

    def __sync_descriptor(self, descriptor: EndpointDescriptor[PT, T]) -> None:
        operator: BaseOperator[T]
        if self.shared:
            operator = SharedOperator[T](self.ttl)
        else:
            operator = LocalOperator[T](self.ttl)
        descriptor.register_operator(self.priority, operator)

    def __async_descriptor(self, descriptor: AsyncEndpointDescriptor[PT, T]) -> None:
        operator: BaseAsyncOperator[T]
        if self.shared:
            operator = SharedAsyncOperator[T](self.ttl)
        else:
            operator = LocalAsyncOperator[T](self.ttl)
        descriptor.register_operator(self.priority, operator)

priority property

Returns: the priority of the cache operator.

__call__(descriptor)

Apply the cache option to the endpoint descriptor.

Source code in src/meatie/option/cache_option.py
def __call__(
    self,
    descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
) -> None:
    """Apply the cache option to the endpoint descriptor."""
    if isinstance(descriptor, EndpointDescriptor):
        return self.__sync_descriptor(descriptor)
    return self.__async_descriptor(descriptor)

__init__(ttl, shared=False)

Creates a new cache option.

Parameters:

Name Type Description Default
ttl Duration

the time-to-live of the cache entry in seconds

required
shared bool

if set to False (default) the cache entry will be stored in the local cache owned by the client instance. Records cached by another client instance will not be visible. Otherwise, if set to True, all client that are instances of the same Python class will share the same cache.

False
Source code in src/meatie/option/cache_option.py
def __init__(self, ttl: Duration, shared: bool = False) -> None:
    """Creates a new cache option.

    Parameters:
        ttl: the time-to-live of the cache entry in seconds
        shared: if set to False (default) the cache entry will be stored in the local cache owned by the client instance. Records cached by another client instance will not be visible.
            Otherwise, if set to True, all client that are instances of the same Python class will share the same cache.
    """
    self.ttl = ttl
    self.shared = shared

LocalAsyncOperator

Bases: BaseAsyncOperator[T]

Asynchronous cache operator that stores the value returned from the endpoint in the local cache owned by the client instance.

Source code in src/meatie/option/cache_option.py
class LocalAsyncOperator(BaseAsyncOperator[T]):
    """Asynchronous cache operator that stores the value returned from the endpoint in the local cache owned by the client instance."""

    def _storage(self, ctx: AsyncContext[T]) -> Cache:
        return ctx.client.local_cache

LocalOperator

Bases: BaseOperator[T]

Cache operator that stores the value returned from the endpoint in the local cache owned by the client instance.

Source code in src/meatie/option/cache_option.py
class LocalOperator(BaseOperator[T]):
    """Cache operator that stores the value returned from the endpoint in the local cache owned by the client instance."""

    def _storage(self, ctx: Context[T]) -> Cache:
        return ctx.client.local_cache

SharedAsyncOperator

Bases: BaseAsyncOperator[T]

Asynchronous cache operator that stores the value returned from the endpoint in the cache shared by all client instances of the same Python class.

Source code in src/meatie/option/cache_option.py
class SharedAsyncOperator(BaseAsyncOperator[T]):
    """Asynchronous cache operator that stores the value returned from the endpoint in the cache shared by all client instances of the same Python class."""

    def _storage(self, ctx: AsyncContext[T]) -> Cache:
        return ctx.client.shared_cache

SharedOperator

Bases: BaseOperator[T]

Cache operator that stores the value returned from the endpoint in the cache shared by all client instances of the same Python class.

Source code in src/meatie/option/cache_option.py
class SharedOperator(BaseOperator[T]):
    """Cache operator that stores the value returned from the endpoint in the cache shared by all client instances of the same Python class."""

    def _storage(self, ctx: Context[T]) -> Cache:
        return ctx.client.shared_cache

AsyncLimitOperator

Bases: Generic[T]

Delays the endpoint calls that exceed the rate limit.

Source code in src/meatie/option/limit_option.py
class AsyncLimitOperator(Generic[T]):
    """Delays the endpoint calls that exceed the rate limit."""

    def __init__(self, tokens: Tokens, sleep_func: Callable[[Duration], Awaitable[None]]) -> None:
        """Creates a new limit operator.

        Args:
            tokens: number of tokens consumed by the endpoint call
            sleep_func: the sleep function to use (default: asyncio.sleep).
        """
        self.tokens = tokens
        self.sleep_func = sleep_func

    async def __call__(self, ctx: AsyncContext[T]) -> T:
        current_time = time.monotonic()
        reservation = ctx.client.limiter.reserve_at(current_time, self.tokens)
        delay = reservation.ready_at - current_time
        if delay > 0:
            await self.sleep_func(delay)

        return await ctx.proceed()

__init__(tokens, sleep_func)

Creates a new limit operator.

Parameters:

Name Type Description Default
tokens Tokens

number of tokens consumed by the endpoint call

required
sleep_func Callable[[Duration], Awaitable[None]]

the sleep function to use (default: asyncio.sleep).

required
Source code in src/meatie/option/limit_option.py
def __init__(self, tokens: Tokens, sleep_func: Callable[[Duration], Awaitable[None]]) -> None:
    """Creates a new limit operator.

    Args:
        tokens: number of tokens consumed by the endpoint call
        sleep_func: the sleep function to use (default: asyncio.sleep).
    """
    self.tokens = tokens
    self.sleep_func = sleep_func

LimitOperator

Bases: Generic[T]

Delays the endpoint calls that exceed the rate limit.

Source code in src/meatie/option/limit_option.py
class LimitOperator(Generic[T]):
    """Delays the endpoint calls that exceed the rate limit."""

    def __init__(self, tokens: Tokens, sleep_func: Callable[[Duration], None]) -> None:
        """Creates a new limit operator.

        Args:
            tokens: number of tokens consumed by the endpoint call
            sleep_func: the sleep function to use (default: time.sleep).
        """
        self.tokens = tokens
        self.sleep_func = sleep_func

    def __call__(self, ctx: Context[T]) -> T:
        current_time = time.monotonic()
        reservation = ctx.client.limiter.reserve_at(current_time, self.tokens)
        delay = reservation.ready_at - current_time
        if delay > 0:
            self.sleep_func(delay)

        return ctx.proceed()

__init__(tokens, sleep_func)

Creates a new limit operator.

Parameters:

Name Type Description Default
tokens Tokens

number of tokens consumed by the endpoint call

required
sleep_func Callable[[Duration], None]

the sleep function to use (default: time.sleep).

required
Source code in src/meatie/option/limit_option.py
def __init__(self, tokens: Tokens, sleep_func: Callable[[Duration], None]) -> None:
    """Creates a new limit operator.

    Args:
        tokens: number of tokens consumed by the endpoint call
        sleep_func: the sleep function to use (default: time.sleep).
    """
    self.tokens = tokens
    self.sleep_func = sleep_func

LimitOption

Configure the rate limit for the endpoint calls.

Source code in src/meatie/option/limit_option.py
class LimitOption:
    """Configure the rate limit for the endpoint calls."""

    def __init__(
        self,
        tokens: Tokens,
        sleep_func: Union[Callable[[float], Union[None, Awaitable[None]]], None] = None,
    ) -> None:
        """Creates a new rate limit option.

        The number of available tokens at a given time are controlled by the rate limiter instance used by the client.
        Meatie provides leaky bucket rate limiter implementation with constant replenishment rate and burst size. See meatie.Limiter.

        Parameters:
            tokens: number of tokens consumed by the endpoint call
            sleep_func: the sleep function to use. Default behaviour is to rely on the Python standard library functions: time.sleep and asyncio.sleep for async functions.
        """
        self.tokens = tokens
        self.sleep_func = sleep_func

    def __call__(
        self,
        descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
    ) -> None:
        """Apply the rate limit option to the endpoint descriptor."""
        if isinstance(descriptor, EndpointDescriptor):
            return self.__sync_descriptor(descriptor)
        return self.__async_descriptor(descriptor)

    @property
    def priority(self) -> int:
        """Returns: the priority of the limit operator."""
        return 60

    def __sync_descriptor(self, descriptor: EndpointDescriptor[PT, T]) -> None:
        if self.tokens <= 0.0:
            return

        sleep_func: Callable[[float], None] = time.sleep
        if self.sleep_func is not None:
            sleep_func = self.sleep_func  # type: ignore[assignment]
        operator = LimitOperator[T](self.tokens, sleep_func)
        descriptor.register_operator(self.priority, operator)

    def __async_descriptor(self, descriptor: AsyncEndpointDescriptor[PT, T]) -> None:
        if self.tokens <= 0.0:
            return

        sleep_func: Callable[[float], Awaitable[None]] = asyncio.sleep
        if self.sleep_func is not None:
            sleep_func = self.sleep_func  # type: ignore[assignment]
        operator = AsyncLimitOperator[T](self.tokens, sleep_func)
        descriptor.register_operator(self.priority, operator)

priority property

Returns: the priority of the limit operator.

__call__(descriptor)

Apply the rate limit option to the endpoint descriptor.

Source code in src/meatie/option/limit_option.py
def __call__(
    self,
    descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
) -> None:
    """Apply the rate limit option to the endpoint descriptor."""
    if isinstance(descriptor, EndpointDescriptor):
        return self.__sync_descriptor(descriptor)
    return self.__async_descriptor(descriptor)

__init__(tokens, sleep_func=None)

Creates a new rate limit option.

The number of available tokens at a given time are controlled by the rate limiter instance used by the client. Meatie provides leaky bucket rate limiter implementation with constant replenishment rate and burst size. See meatie.Limiter.

Parameters:

Name Type Description Default
tokens Tokens

number of tokens consumed by the endpoint call

required
sleep_func Union[Callable[[float], Union[None, Awaitable[None]]], None]

the sleep function to use. Default behaviour is to rely on the Python standard library functions: time.sleep and asyncio.sleep for async functions.

None
Source code in src/meatie/option/limit_option.py
def __init__(
    self,
    tokens: Tokens,
    sleep_func: Union[Callable[[float], Union[None, Awaitable[None]]], None] = None,
) -> None:
    """Creates a new rate limit option.

    The number of available tokens at a given time are controlled by the rate limiter instance used by the client.
    Meatie provides leaky bucket rate limiter implementation with constant replenishment rate and burst size. See meatie.Limiter.

    Parameters:
        tokens: number of tokens consumed by the endpoint call
        sleep_func: the sleep function to use. Default behaviour is to rely on the Python standard library functions: time.sleep and asyncio.sleep for async functions.
    """
    self.tokens = tokens
    self.sleep_func = sleep_func

PrivateOption

Include additional information in the HTTP request before calling the API endpoint.

Instructs Meatie library to call the authenticate method of the client instance with the HTTP request as the argument.

Popular use cases are to set the Authorization header or sign the HTTP request using API keys before calling the API endpoint.

Source code in src/meatie/option/private_option.py
class PrivateOption:
    """Include additional information in the HTTP request before calling the API endpoint.

    Instructs Meatie library to call the `authenticate` method of the client instance with the HTTP request as the argument.

    Popular use cases are to set the `Authorization` header or sign the HTTP request using API keys before calling the API endpoint.
    """

    def __call__(
        self,
        descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
    ) -> None:
        """Apply the private option to the endpoint descriptor."""
        if isinstance(descriptor, EndpointDescriptor):
            return self.__sync_descriptor(descriptor)
        return self.__async_descriptor(descriptor)

    @property
    def priority(self) -> int:
        """Returns: the priority of the private operator."""
        return 80

    def __sync_descriptor(self, descriptor: EndpointDescriptor[PT, T]) -> None:
        descriptor.register_operator(self.priority, _operator)

    def __async_descriptor(self, descriptor: AsyncEndpointDescriptor[PT, T]) -> None:
        descriptor.register_operator(self.priority, _async_operator)

priority property

Returns: the priority of the private operator.

__call__(descriptor)

Apply the private option to the endpoint descriptor.

Source code in src/meatie/option/private_option.py
def __call__(
    self,
    descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
) -> None:
    """Apply the private option to the endpoint descriptor."""
    if isinstance(descriptor, EndpointDescriptor):
        return self.__sync_descriptor(descriptor)
    return self.__async_descriptor(descriptor)

AsyncRetryOperator

Bases: Generic[T]

Executes the retry strategy for asynchronous endpoint calls.

Source code in src/meatie/option/retry_option.py
class AsyncRetryOperator(Generic[T]):
    """Executes the retry strategy for asynchronous endpoint calls."""

    def __init__(
        self,
        on: Condition,
        wait: WaitFunc,
        stop: Condition,
        sleep_func: Callable[[Duration], Awaitable[None]],
    ) -> None:
        """Creates a new retry operator.

        Args:
            on: function that returns True if the operation should be retried.
            wait: function that returns the duration to wait before the next retry attempt.
            stop: function that returns True if the operation should be aborted.
            sleep_func: the sleep function to use.
        """
        self.__condition = on
        self.__wait = wait
        self.__stop = stop
        self.__sleep_func = sleep_func

    async def __call__(self, operation_ctx: AsyncContext[T]) -> T:
        retry_ctx = RetryContext(attempt_number=1, started_at=time.monotonic())
        last_result: Optional[T] = None
        stopped = False
        while not stopped:
            if retry_ctx.attempt_number > 1:
                wait_time = self.__wait(retry_ctx)
                if wait_time > 0.0:
                    await self.__sleep_func(wait_time)

            retry_ctx.error = None
            retry_ctx.response = None
            try:
                last_result = await operation_ctx.proceed()
                retry_ctx.response = operation_ctx.response
            except BaseException as exc:
                retry_ctx.error = exc

            if not self.__condition(retry_ctx):
                break

            retry_ctx.attempt_number += 1
            stopped = self.__stop(retry_ctx)

        if not stopped and last_result is not None:
            return last_result

        if retry_ctx.error is not None:
            raise retry_ctx.error
        raise RetryError()

__init__(on, wait, stop, sleep_func)

Creates a new retry operator.

Parameters:

Name Type Description Default
on Condition

function that returns True if the operation should be retried.

required
wait WaitFunc

function that returns the duration to wait before the next retry attempt.

required
stop Condition

function that returns True if the operation should be aborted.

required
sleep_func Callable[[Duration], Awaitable[None]]

the sleep function to use.

required
Source code in src/meatie/option/retry_option.py
def __init__(
    self,
    on: Condition,
    wait: WaitFunc,
    stop: Condition,
    sleep_func: Callable[[Duration], Awaitable[None]],
) -> None:
    """Creates a new retry operator.

    Args:
        on: function that returns True if the operation should be retried.
        wait: function that returns the duration to wait before the next retry attempt.
        stop: function that returns True if the operation should be aborted.
        sleep_func: the sleep function to use.
    """
    self.__condition = on
    self.__wait = wait
    self.__stop = stop
    self.__sleep_func = sleep_func

RetryOperator

Bases: Generic[T]

Executes the retry strategy for synchronous endpoint calls.

Source code in src/meatie/option/retry_option.py
class RetryOperator(Generic[T]):
    """Executes the retry strategy for synchronous endpoint calls."""

    def __init__(
        self,
        on: Condition,
        wait: WaitFunc,
        stop: Condition,
        sleep_func: Callable[[Duration], None],
    ) -> None:
        """Creates a new retry operator.

        Args:
            on: function that returns True if the operation should be retried.
            wait: function that returns the duration to wait before the next retry attempt.
            stop: function that returns True if the operation should be aborted.
            sleep_func: the sleep function to use.
        """
        self.__condition = on
        self.__wait = wait
        self.__stop = stop
        self.__sleep_func = sleep_func

    def __call__(self, operation_ctx: Context[T]) -> T:
        retry_ctx = RetryContext(attempt_number=1, started_at=time.monotonic())
        last_result: Optional[T] = None
        stopped = False
        while not stopped:
            if retry_ctx.attempt_number > 1:
                wait_time = self.__wait(retry_ctx)
                if wait_time > 0.0:
                    self.__sleep_func(wait_time)

            retry_ctx.error = None
            retry_ctx.response = None
            try:
                last_result = operation_ctx.proceed()
                retry_ctx.response = operation_ctx.response
            except BaseException as exc:
                retry_ctx.error = exc

            if not self.__condition(retry_ctx):
                break

            retry_ctx.attempt_number += 1
            stopped = self.__stop(retry_ctx)

        if not stopped and last_result is not None:
            return last_result

        if retry_ctx.error is not None:
            raise retry_ctx.error
        raise RetryError()

__init__(on, wait, stop, sleep_func)

Creates a new retry operator.

Parameters:

Name Type Description Default
on Condition

function that returns True if the operation should be retried.

required
wait WaitFunc

function that returns the duration to wait before the next retry attempt.

required
stop Condition

function that returns True if the operation should be aborted.

required
sleep_func Callable[[Duration], None]

the sleep function to use.

required
Source code in src/meatie/option/retry_option.py
def __init__(
    self,
    on: Condition,
    wait: WaitFunc,
    stop: Condition,
    sleep_func: Callable[[Duration], None],
) -> None:
    """Creates a new retry operator.

    Args:
        on: function that returns True if the operation should be retried.
        wait: function that returns the duration to wait before the next retry attempt.
        stop: function that returns True if the operation should be aborted.
        sleep_func: the sleep function to use.
    """
    self.__condition = on
    self.__wait = wait
    self.__stop = stop
    self.__sleep_func = sleep_func

RetryOption

Configure the strategy for retrying the endpoint calls that failed.

Source code in src/meatie/option/retry_option.py
class RetryOption:
    """Configure the strategy for retrying the endpoint calls that failed."""

    def __init__(
        self,
        on: Condition = has_status(HTTPStatus.TOO_MANY_REQUESTS),
        wait: WaitFunc = zero,
        stop: Condition = never,
        sleep_func: Union[Callable[[Duration], None], Callable[[Duration], Awaitable[None]], None] = None,
    ) -> None:
        """Creates a new retry option.

        Parameters:
            on: function that returns True if the operation should be retried. Default behaviour is to retry on the HTTP Too Many Requests (429) status code.
            wait: function that returns the duration to wait before the next retry attempt. Default behaviour is no to wait.
            stop: function that returns True if the operation should be aborted. Default behaviour is never to stop.
            sleep_func: the sleep function to use. Default behaviour is to rely on the Python standard library functions: time.sleep and asyncio.sleep for async functions.

        See Also:
            meatie.has_status: retry on a specific HTTP status code
            meatie.has_exception_type: retry on a specific exception type
            meatie.has_exception_cause_type: retry if a specific exception type is present in the exception chain
            meatie.zero: do not wait
            meatie.fixed: wait a fixed amount of seconds
            meatie.uniform: wait a random amount of seconds in the given time range
            meatie.exponential: keep increasing the wait time exponentially
            meatie.jitter: add a random jitter to the wait time
            meatie.never: never stop
            meatie.after: stop after deadline in seconds
            meatie.after_attempt: stop after a number of attempts
        """
        self.__on = on
        self.__wait = wait
        self.__stop = stop
        self.__sleep_func = sleep_func

    def __call__(
        self,
        descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
    ) -> None:
        """Apply the retry option to the endpoint descriptor."""
        if isinstance(descriptor, EndpointDescriptor):
            return self.__sync_descriptor(descriptor)
        return self.__async_descriptor(descriptor)

    @property
    def priority(self) -> int:
        """Returns: the priority of the retry operator."""
        return 40

    def __sync_descriptor(self, descriptor: EndpointDescriptor[PT, T]) -> None:
        sleep_func: Union[Callable[[float], None], None] = self.__sleep_func  # type: ignore[assignment]
        if sleep_func is None:
            sleep_func = time.sleep

        operator = RetryOperator[T](self.__on, self.__wait, self.__stop, sleep_func)
        descriptor.register_operator(self.priority, operator)

    def __async_descriptor(self, descriptor: AsyncEndpointDescriptor[PT, T]) -> None:
        sleep_func: Union[Callable[[float], Awaitable[None]], None] = self.__sleep_func  # type: ignore[assignment]
        if sleep_func is None:
            sleep_func = asyncio.sleep

        operator = AsyncRetryOperator[T](self.__on, self.__wait, self.__stop, sleep_func)
        descriptor.register_operator(self.priority, operator)

priority property

Returns: the priority of the retry operator.

__call__(descriptor)

Apply the retry option to the endpoint descriptor.

Source code in src/meatie/option/retry_option.py
def __call__(
    self,
    descriptor: Union[EndpointDescriptor[PT, T], AsyncEndpointDescriptor[PT, T]],
) -> None:
    """Apply the retry option to the endpoint descriptor."""
    if isinstance(descriptor, EndpointDescriptor):
        return self.__sync_descriptor(descriptor)
    return self.__async_descriptor(descriptor)

__init__(on=has_status(HTTPStatus.TOO_MANY_REQUESTS), wait=zero, stop=never, sleep_func=None)

Creates a new retry option.

Parameters:

Name Type Description Default
on Condition

function that returns True if the operation should be retried. Default behaviour is to retry on the HTTP Too Many Requests (429) status code.

has_status(TOO_MANY_REQUESTS)
wait WaitFunc

function that returns the duration to wait before the next retry attempt. Default behaviour is no to wait.

zero
stop Condition

function that returns True if the operation should be aborted. Default behaviour is never to stop.

never
sleep_func Union[Callable[[Duration], None], Callable[[Duration], Awaitable[None]], None]

the sleep function to use. Default behaviour is to rely on the Python standard library functions: time.sleep and asyncio.sleep for async functions.

None
See Also

meatie.has_status: retry on a specific HTTP status code meatie.has_exception_type: retry on a specific exception type meatie.has_exception_cause_type: retry if a specific exception type is present in the exception chain meatie.zero: do not wait meatie.fixed: wait a fixed amount of seconds meatie.uniform: wait a random amount of seconds in the given time range meatie.exponential: keep increasing the wait time exponentially meatie.jitter: add a random jitter to the wait time meatie.never: never stop meatie.after: stop after deadline in seconds meatie.after_attempt: stop after a number of attempts

Source code in src/meatie/option/retry_option.py
def __init__(
    self,
    on: Condition = has_status(HTTPStatus.TOO_MANY_REQUESTS),
    wait: WaitFunc = zero,
    stop: Condition = never,
    sleep_func: Union[Callable[[Duration], None], Callable[[Duration], Awaitable[None]], None] = None,
) -> None:
    """Creates a new retry option.

    Parameters:
        on: function that returns True if the operation should be retried. Default behaviour is to retry on the HTTP Too Many Requests (429) status code.
        wait: function that returns the duration to wait before the next retry attempt. Default behaviour is no to wait.
        stop: function that returns True if the operation should be aborted. Default behaviour is never to stop.
        sleep_func: the sleep function to use. Default behaviour is to rely on the Python standard library functions: time.sleep and asyncio.sleep for async functions.

    See Also:
        meatie.has_status: retry on a specific HTTP status code
        meatie.has_exception_type: retry on a specific exception type
        meatie.has_exception_cause_type: retry if a specific exception type is present in the exception chain
        meatie.zero: do not wait
        meatie.fixed: wait a fixed amount of seconds
        meatie.uniform: wait a random amount of seconds in the given time range
        meatie.exponential: keep increasing the wait time exponentially
        meatie.jitter: add a random jitter to the wait time
        meatie.never: never stop
        meatie.after: stop after deadline in seconds
        meatie.after_attempt: stop after a number of attempts
    """
    self.__on = on
    self.__wait = wait
    self.__stop = stop
    self.__sleep_func = sleep_func