# -*- coding: utf-8 -*-
from enum import Enum
import logging
import logging.config
import traceback
import ujson
from json.decoder import JSONDecodeError
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import Response
from marshmallow.exceptions import ValidationError
from aioli.exceptions import HTTPException, AioliException
from aioli.log import LOGGING_CONFIG_DEFAULTS
from aioli.package import Package
from .config import ApplicationConfigSchema
def jsonify(content, status=200):
return Response(
content=ujson.dumps(content, ensure_ascii=False).encode("utf8"),
status_code=status,
headers={"content-type": "application/json"},
)
async def server_error(_, exc):
if isinstance(exc, NotImplementedError):
message = "Not implemented"
else:
message = "Internal server error"
return jsonify({"message": message}, status=500)
async def validation_error(_, exc):
return jsonify({"message": exc.messages}, status=422)
async def decode_error(*_):
return jsonify({"message": "Error decoding JSON"}, status=400)
async def http_error(_, exc):
return jsonify({"message": exc.detail}, status=exc.status_code)
class ComponentType(Enum):
services = "service"
controllers = "controller"
class ImportRegistry:
imported = {}
def __init__(self, modules, conf_full):
self._conf_full = conf_full
self._modules = set(modules)
def _get_components(self, comp_type, pkg_name=None):
comp_type = ComponentType(comp_type).name
if pkg_name:
return getattr(self.imported[pkg_name], comp_type)
comps = []
for pkg, _ in self.imported.values():
comps += getattr(pkg, comp_type)
return comps
def get_services(self, pkg_name=None):
return [(svc.__class__, svc) for svc in self._get_components("service", pkg_name)]
async def attach_to(self, app):
for module in self._modules:
if not hasattr(module, "export"):
raise Exception(f"Missing export member of class {Package} in {module}")
package = module.export
app.log.info(f"Attaching {package.name}/{package.version}")
if not isinstance(package, Package):
raise Exception(f"Invalid package type {package}: must be of type {Package}")
config = self._conf_full.get(package.name, {})
await package.register(app, config)
self.imported.update({package.name: (package, module)})
for pkg, _ in self.imported.values():
await pkg.attach_controllers()
await pkg.attach_services()
[docs]class Application(Starlette):
"""Creates an Aioli application
:param config: Configuration dictionary
:param packages: List of package tuples [(<mount path>, <module>), ...]
:param kwargs: Keyword arguments to pass along to Starlette
:var log: Aioli Application logger
:var packages: Packages registered with the Application
"""
log = logging.getLogger("aioli")
packages = None
__state = {}
def __init__(self, packages, **kwargs):
if not isinstance(packages, list):
raise Exception(
f"aioli.Application expects an iterable of Packages, got: {type(packages)}"
)
config = kwargs.pop("config", {})
self.registry = ImportRegistry(packages, config)
try:
self.conf = ApplicationConfigSchema().load(config.get("aioli", {}))
except ValueError:
raise Exception("Application `config` must be a collection")
except ValidationError as e:
raise Exception(f"Configuration validation error: {e.messages}")
for name, logger in LOGGING_CONFIG_DEFAULTS['loggers'].items():
self.log_level = logger['level'] = 'DEBUG' if self.conf.get('debug') else 'INFO'
logging.config.dictConfig(LOGGING_CONFIG_DEFAULTS)
# Apply known settings from environment or provided `config`
super(Application, self).__init__(**kwargs)
# Lifespan handlers
self.router.lifespan.add_event_handler("startup", self._startup)
self.router.lifespan.add_event_handler("shutdown", self._shutdown)
# Error handlers
self.add_exception_handler(AioliException, http_error)
self.add_exception_handler(HTTPException, http_error)
self.add_exception_handler(ValidationError, validation_error)
self.add_exception_handler(JSONDecodeError, decode_error)
self.add_exception_handler(Exception, server_error)
# Middleware
self.add_middleware(CORSMiddleware, allow_origins=["*"])
[docs] def add_exception_handler(self, exception, handler):
"""Add a new exception handler
:param exception: Exception class
:param handler: Exception handler
"""
return super(Application, self).add_exception_handler(exception, handler)
async def _startup(self):
try:
self.log.info("Commencing countdown, engines on")
await self.registry.attach_to(self)
self.log.info(f"Loaded {len(self.registry.imported)} packages ~ Ready for action!")
except Exception as e:
self.log.critical(traceback.format_exc())
raise e
async def _shutdown(self):
for mod, pkg in self.packages.attached:
await pkg.detach_services()