Source code for aioli.app
import logging
from json.decoder import JSONDecodeError
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from marshmallow.exceptions import ValidationError
from aioli.exceptions import HTTPException, AioliException, BootstrapError
from .config import ApplicationConfigSchema
from .registry import ImportRegistry
from .errors import http_error, validation_error, decode_error
from .datastores import MemoryStore
[docs]class Application(Starlette):
"""Aioli application core
:param config: Configuration dictionary
:param units: List of units
:var log: Aioli Application logger
:var registry: ImportRegistry instance
:var config: Application config
"""
log = logging.getLogger("aioli.core")
state = MemoryStore("app")
def __init__(self, units, **kwargs):
if not isinstance(units, list):
raise BootstrapError(
f"aioli.Application expects an iterable of Units, got: {type(units)}"
)
config = kwargs.pop("config", {})
self.__units = units
try:
self.config = ApplicationConfigSchema().load(config.get("aioli-core", {}))
except ValueError:
raise BootstrapError("Application `config` must be a collection")
except ValidationError as e:
raise BootstrapError(f"Configuration validation error: {e.messages}")
self.registry = ImportRegistry(self, config)
# Apply known settings from environment or provided `config`
super(Application, self).__init__(debug=self.config["debug"], **kwargs)
# Error handlers
self.add_exception_handler(AioliException, http_error)
self.add_exception_handler(HTTPException, http_error)
self.add_exception_handler(ValidationError, validation_error)
self.add_exception_handler(JSONDecodeError, decode_error)
# Middleware
self.add_middleware(CORSMiddleware, allow_origins=self.config["allow_origins"])
# Lifespan handlers
self.router.lifespan.add_event_handler("startup", self._startup)
self.router.lifespan.add_event_handler("shutdown", self._shutdown)
def load_units(self):
self.log.info("Commencing countdown, engines on")
self.registry.register_units(self.__units)
async def _startup(self):
if not self.__units:
self.log.warning(f"No Units loaded")
return
total, failed = await self.registry.call_startup_handlers()
if failed > 0:
self.log.warning(f"Application degraded")
else:
self.log.info(f"Application ready: {total} Units loaded")
async def _shutdown(self):
for unit in self.registry.imported:
await unit.detach_services()