This document describes the current stable version of Kombu (5.0). For development docs, go here.

Source code for kombu.asynchronous.aws.connection

"""Amazon AWS Connection."""

from vine import promise, transform

from kombu.asynchronous.aws.ext import AWSRequest, get_response

from kombu.asynchronous.http import Headers, Request, get_client

import io

try:  # pragma: no cover
    from email import message_from_bytes
    from email.mime.message import MIMEMessage

    # py3
    def message_from_headers(hdr):  # noqa
        bs = "\r\n".join("{}: {}".format(*h) for h in hdr)
        return message_from_bytes(bs.encode())

except ImportError:  # pragma: no cover
    from mimetools import Message as MIMEMessage  # noqa

    # py2
    def message_from_headers(hdr):  # noqa
        return io.BytesIO(b'\r\n'.join(
            b'{}: {}'.format(*h) for h in hdr
        ))

__all__ = (
    'AsyncHTTPSConnection', 'AsyncConnection',
)


class AsyncHTTPResponse:
    """Async HTTP Response."""

    def __init__(self, response):
        self.response = response
        self._msg = None
        self.version = 10

    def read(self, *args, **kwargs):
        return self.response.body

    def getheader(self, name, default=None):
        return self.response.headers.get(name, default)

    def getheaders(self):
        return list(self.response.headers.items())

    @property
    def msg(self):
        if self._msg is None:
            self._msg = MIMEMessage(message_from_headers(self.getheaders()))
        return self._msg

    @property
    def status(self):
        return self.response.code

    @property
    def reason(self):
        if self.response.error:
            return self.response.error.message
        return ''

    def __repr__(self):
        return repr(self.response)


[docs]class AsyncHTTPSConnection: """Async HTTP Connection.""" Request = Request Response = AsyncHTTPResponse method = 'GET' path = '/' body = None default_ports = {'http': 80, 'https': 443} def __init__(self, strict=None, timeout=20.0, http_client=None): self.headers = [] self.timeout = timeout self.strict = strict self.http_client = http_client or get_client()
[docs] def request(self, method, path, body=None, headers=None): self.path = path self.method = method if body is not None: try: read = body.read except AttributeError: self.body = body else: self.body = read() if headers is not None: self.headers.extend(list(headers.items()))
[docs] def getrequest(self): headers = Headers(self.headers) return self.Request(self.path, method=self.method, headers=headers, body=self.body, connect_timeout=self.timeout, request_timeout=self.timeout, validate_cert=False)
[docs] def getresponse(self, callback=None): request = self.getrequest() request.then(transform(self.Response, callback)) return self.http_client.add_request(request)
[docs] def set_debuglevel(self, level): pass
[docs] def connect(self): pass
[docs] def close(self): pass
[docs] def putrequest(self, method, path): self.method = method self.path = path
[docs] def putheader(self, header, value): self.headers.append((header, value))
[docs] def endheaders(self): pass
[docs] def send(self, data): if self.body: self.body += data else: self.body = data
def __repr__(self): return f'<AsyncHTTPConnection: {self.getrequest()!r}>'
[docs]class AsyncConnection: """Async AWS Connection.""" def __init__(self, sqs_connection, http_client=None, **kwargs): # noqa self.sqs_connection = sqs_connection self._httpclient = http_client or get_client()
[docs] def get_http_connection(self): return AsyncHTTPSConnection(http_client=self._httpclient)
def _mexe(self, request, sender=None, callback=None): callback = callback or promise() conn = self.get_http_connection() if callable(sender): sender(conn, request.method, request.path, request.body, request.headers, callback) else: conn.request(request.method, request.url, request.body, request.headers) conn.getresponse(callback=callback) return callback
class AsyncAWSQueryConnection(AsyncConnection): """Async AWS Query Connection.""" STATUS_CODE_OK = 200 STATUS_CODE_REQUEST_TIMEOUT = 408 STATUS_CODE_NETWORK_CONNECT_TIMEOUT_ERROR = 599 STATUS_CODE_INTERNAL_ERROR = 500 STATUS_CODE_BAD_GATEWAY = 502 STATUS_CODE_SERVICE_UNAVAILABLE_ERROR = 503 STATUS_CODE_GATEWAY_TIMEOUT = 504 STATUS_CODES_SERVER_ERRORS = ( STATUS_CODE_INTERNAL_ERROR, STATUS_CODE_BAD_GATEWAY, STATUS_CODE_SERVICE_UNAVAILABLE_ERROR ) STATUS_CODES_TIMEOUT = ( STATUS_CODE_REQUEST_TIMEOUT, STATUS_CODE_NETWORK_CONNECT_TIMEOUT_ERROR, STATUS_CODE_GATEWAY_TIMEOUT ) def __init__(self, sqs_connection, http_client=None, http_client_params=None, **kwargs): if not http_client_params: http_client_params = {} AsyncConnection.__init__(self, sqs_connection, http_client, **http_client_params) def make_request(self, operation, params_, path, verb, callback=None): # noqa params = params_.copy() if operation: params['Action'] = operation signer = self.sqs_connection._request_signer # noqa # defaults for non-get signing_type = 'standard' param_payload = {'data': params} if verb.lower() == 'get': # query-based opts signing_type = 'presignurl' param_payload = {'params': params} request = AWSRequest(method=verb, url=path, **param_payload) signer.sign(operation, request, signing_type=signing_type) prepared_request = request.prepare() return self._mexe(prepared_request, callback=callback) def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa return self.make_request( operation, params, path, verb, callback=transform( self._on_list_ready, callback, parent or self, markers, operation ), ) def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa return self.make_request( operation, params, path, verb, callback=transform( self._on_obj_ready, callback, parent or self, operation ), ) def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa return self.make_request( operation, params, path, verb, callback=transform( self._on_status_ready, callback, parent or self, operation ), ) def _on_list_ready(self, parent, markers, operation, response): # noqa service_model = self.sqs_connection.meta.service_model if response.status == self.STATUS_CODE_OK: _, parsed = get_response( service_model.operation_model(operation), response.response ) return parsed elif ( response.status in self.STATUS_CODES_TIMEOUT or response.status in self.STATUS_CODES_SERVER_ERRORS ): # When the server returns a timeout or 50X server error, # the response is interpreted as an empty list. # This prevents hanging the Celery worker. return [] else: raise self._for_status(response, response.read()) def _on_obj_ready(self, parent, operation, response): # noqa service_model = self.sqs_connection.meta.service_model if response.status == self.STATUS_CODE_OK: _, parsed = get_response( service_model.operation_model(operation), response.response ) return parsed else: raise self._for_status(response, response.read()) def _on_status_ready(self, parent, operation, response): # noqa service_model = self.sqs_connection.meta.service_model if response.status == self.STATUS_CODE_OK: httpres, _ = get_response( service_model.operation_model(operation), response.response ) return httpres.code else: raise self._for_status(response, response.read()) def _for_status(self, response, body): context = 'Empty body' if not body else 'HTTP Error' return Exception("Request {} HTTP {} {} ({})".format( context, response.status, response.reason, body ))