Skip to content

Module config.spring

Module for retrieve application's config from Spring Cloud Config.

None

View Source
"""Module for retrieve application's config from Spring Cloud Config."""

import asyncio

import os

from distutils.util import strtobool

from functools import partial, wraps

from typing import Any, Callable, Dict, KeysView, Optional, Tuple

from attrs import field, fields_dict, mutable, validators

from glom import glom

from . import http

from ._config import merge_dict, to_dict

from .auth import OAuth2

from .core import singleton

from .exceptions import RequestFailedException

from .logger import logger

@mutable

class ConfigClient:

    """Spring Cloud Config Client."""

    address: str = field(

        default=os.getenv("CONFIGSERVER_ADDRESS", "http://localhost:8888"),

        validator=validators.instance_of(str),

    )

    label: str = field(

        default=os.getenv("LABEL", "master"),

        validator=validators.instance_of(str),

    )

    app_name: str = field(

        default=os.getenv("APP_NAME", ""),

        validator=validators.instance_of(str),

    )

    profile: str = field(

        default=os.getenv("PROFILE", "development"),

        validator=validators.instance_of(str),

    )

    fail_fast: bool = field(

        default=bool(strtobool(str(os.getenv("CONFIG_FAIL_FAST", True)))),

        validator=validators.instance_of(bool),

        converter=bool,

    )

    oauth2: Optional[OAuth2] = field(

        default=None,

        validator=validators.optional(validators.instance_of(OAuth2)),

    )

    _config: dict = field(

        factory=dict,

        init=False,

        validator=validators.instance_of(dict),

        repr=False,

    )

    @property

    def url(self) -> str:

        return f"{self.address}/{self.app_name}/{self.profile}/{self.label}"

    def get_config(self, **kwargs) -> None:

        """Request the configuration from the config server."""

        kwargs = self._configure_oauth2(**kwargs)

        try:

            response = http.get(self.url, **kwargs)

        except Exception as err:

            logger.error(f"Failed to request: {self.url}")

            logger.error(err)

            if self.fail_fast:

                logger.info("fail_fast enabled. Terminating process.")

                raise SystemExit(1)

            raise ConnectionError("fail_fast disabled.")

        fconfig = [

            to_dict(config)

            for config in reversed(

                glom(response.json(), ("propertySources", ["source"]))

            )

        ]

        server_config: dict = {}

        [merge_dict(server_config, c) for c in fconfig]

        merge_dict(self._config, server_config)

    async def get_config_async(self, **kwargs) -> None:

        loop = asyncio.get_running_loop()

        await loop.run_in_executor(None, partial(self.get_config, **kwargs))

    def _configure_oauth2(self, **kwargs) -> dict:

        if self.oauth2:

            self.oauth2.configure(**kwargs)

            try:

                kwargs["headers"].update(self.oauth2.authorization_header)

            except KeyError:

                kwargs.update(dict(headers=self.oauth2.authorization_header))

        return kwargs

    def get_file(self, filename: str, **kwargs: dict) -> str:

        """Request a file from the config server."""

        uri = f"{self.address}/{self.app_name}/{self.profile}/{self.label}/{filename}"

        try:

            response = http.get(uri, **kwargs)

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {uri}")

        return response.text

    def encrypt(

        self,

        value: str,

        path: str = "/encrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a encryption from a value to the config server."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

    def decrypt(

        self,

        value: str,

        path: str = "/decrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a decryption from a value to the config server.."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

    @property

    def config(self) -> Dict:

        """Getter from configurations retrieved from ConfigClient."""

        return self._config

    def get(self, key: str, default: Any = "") -> Any:

        return glom(self._config, key, default=default)

    def keys(self) -> KeysView:

        return self._config.keys()

@singleton

def create_config_client(**kwargs) -> ConfigClient:

    """

    Create ConfigClient singleton instance.

    :param address:

    :param app_name:

    :param branch:

    :param fail_fast:

    :param profile:

    :param url:

    :return: ConfigClient instance.

    """

    instance_params, get_config_params = __get_params(**kwargs)

    obj = ConfigClient(**instance_params)

    obj.get_config(**get_config_params)

    return obj

def config_client(**kwargs) -> Callable[[Dict[str, str]], ConfigClient]:

    """ConfigClient decorator.

    Usage:

    @config_client(app_name='test')

    def get_config(config):

        db_user = config.get_attribute('database.user')

    :raises: ConnectionError: If fail_fast enabled.

    :return: ConfigClient instance.

    """

    instance_params, get_config_params = __get_params(**kwargs)

    def wrap_function(function):

        logger.debug(f"caller: [name='{function.__name__}']")

        @wraps(function)

        def enable_config():

            obj = ConfigClient(**instance_params)

            obj.get_config(**get_config_params)

            return function(obj)

        return enable_config

    return wrap_function

def __get_params(**kwargs) -> Tuple[Dict, Dict]:

    instance_params = {}

    get_config_params = {}

    for key, value in kwargs.items():

        if key in fields_dict(ConfigClient).keys():

            instance_params.update({key: value})

        else:

            get_config_params.update({key: value})

    logger.debug(

        f"params: [kwargs={kwargs}, instance={instance_params}, get_config={get_config_params}]"

    )

    return instance_params, get_config_params

Functions

config_client

def config_client(
    **kwargs
) -> Callable[[Dict[str, str]], config.spring.ConfigClient]

ConfigClient decorator.

Usage:

@config_client(app_name='test') def get_config(config): db_user = config.get_attribute('database.user')

Returns:

Type Description
None ConfigClient instance.

Raises:

Type Description
None ConnectionError: If fail_fast enabled.
View Source
def config_client(**kwargs) -> Callable[[Dict[str, str]], ConfigClient]:

    """ConfigClient decorator.

    Usage:

    @config_client(app_name='test')

    def get_config(config):

        db_user = config.get_attribute('database.user')

    :raises: ConnectionError: If fail_fast enabled.

    :return: ConfigClient instance.

    """

    instance_params, get_config_params = __get_params(**kwargs)

    def wrap_function(function):

        logger.debug(f"caller: [name='{function.__name__}']")

        @wraps(function)

        def enable_config():

            obj = ConfigClient(**instance_params)

            obj.get_config(**get_config_params)

            return function(obj)

        return enable_config

    return wrap_function

create_config_client

def create_config_client(
    **kwargs
) -> config.spring.ConfigClient

Create ConfigClient singleton instance.

Parameters:

Name Type Description Default
address None None
app_name None None
branch None None
fail_fast None None
profile None None
url None None

Returns:

Type Description
None ConfigClient instance.
View Source
@singleton

def create_config_client(**kwargs) -> ConfigClient:

    """

    Create ConfigClient singleton instance.

    :param address:

    :param app_name:

    :param branch:

    :param fail_fast:

    :param profile:

    :param url:

    :return: ConfigClient instance.

    """

    instance_params, get_config_params = __get_params(**kwargs)

    obj = ConfigClient(**instance_params)

    obj.get_config(**get_config_params)

    return obj

Classes

ConfigClient

class ConfigClient(
    address: str = 'http://localhost:8888',
    label: str = 'master',
    app_name: str = '',
    profile: str = 'development',
    fail_fast=True,
    oauth2: Optional[config.auth.OAuth2] = None
)
View Source
@mutable

class ConfigClient:

    """Spring Cloud Config Client."""

    address: str = field(

        default=os.getenv("CONFIGSERVER_ADDRESS", "http://localhost:8888"),

        validator=validators.instance_of(str),

    )

    label: str = field(

        default=os.getenv("LABEL", "master"),

        validator=validators.instance_of(str),

    )

    app_name: str = field(

        default=os.getenv("APP_NAME", ""),

        validator=validators.instance_of(str),

    )

    profile: str = field(

        default=os.getenv("PROFILE", "development"),

        validator=validators.instance_of(str),

    )

    fail_fast: bool = field(

        default=bool(strtobool(str(os.getenv("CONFIG_FAIL_FAST", True)))),

        validator=validators.instance_of(bool),

        converter=bool,

    )

    oauth2: Optional[OAuth2] = field(

        default=None,

        validator=validators.optional(validators.instance_of(OAuth2)),

    )

    _config: dict = field(

        factory=dict,

        init=False,

        validator=validators.instance_of(dict),

        repr=False,

    )

    @property

    def url(self) -> str:

        return f"{self.address}/{self.app_name}/{self.profile}/{self.label}"

    def get_config(self, **kwargs) -> None:

        """Request the configuration from the config server."""

        kwargs = self._configure_oauth2(**kwargs)

        try:

            response = http.get(self.url, **kwargs)

        except Exception as err:

            logger.error(f"Failed to request: {self.url}")

            logger.error(err)

            if self.fail_fast:

                logger.info("fail_fast enabled. Terminating process.")

                raise SystemExit(1)

            raise ConnectionError("fail_fast disabled.")

        fconfig = [

            to_dict(config)

            for config in reversed(

                glom(response.json(), ("propertySources", ["source"]))

            )

        ]

        server_config: dict = {}

        [merge_dict(server_config, c) for c in fconfig]

        merge_dict(self._config, server_config)

    async def get_config_async(self, **kwargs) -> None:

        loop = asyncio.get_running_loop()

        await loop.run_in_executor(None, partial(self.get_config, **kwargs))

    def _configure_oauth2(self, **kwargs) -> dict:

        if self.oauth2:

            self.oauth2.configure(**kwargs)

            try:

                kwargs["headers"].update(self.oauth2.authorization_header)

            except KeyError:

                kwargs.update(dict(headers=self.oauth2.authorization_header))

        return kwargs

    def get_file(self, filename: str, **kwargs: dict) -> str:

        """Request a file from the config server."""

        uri = f"{self.address}/{self.app_name}/{self.profile}/{self.label}/{filename}"

        try:

            response = http.get(uri, **kwargs)

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {uri}")

        return response.text

    def encrypt(

        self,

        value: str,

        path: str = "/encrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a encryption from a value to the config server."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

    def decrypt(

        self,

        value: str,

        path: str = "/decrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a decryption from a value to the config server.."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

    @property

    def config(self) -> Dict:

        """Getter from configurations retrieved from ConfigClient."""

        return self._config

    def get(self, key: str, default: Any = "") -> Any:

        return glom(self._config, key, default=default)

    def keys(self) -> KeysView:

        return self._config.keys()

Instance variables

address
app_name
config

Getter from configurations retrieved from ConfigClient.

fail_fast
label
oauth2
profile
url

Methods

decrypt

def decrypt(
    self,
    value: str,
    path: str = '/decrypt',
    headers: dict = {'Content-Type': 'text/plain'},
    **kwargs: dict
) -> str

Request a decryption from a value to the config server..

View Source
    def decrypt(

        self,

        value: str,

        path: str = "/decrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a decryption from a value to the config server.."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

encrypt

def encrypt(
    self,
    value: str,
    path: str = '/encrypt',
    headers: dict = {'Content-Type': 'text/plain'},
    **kwargs: dict
) -> str

Request a encryption from a value to the config server.

View Source
    def encrypt(

        self,

        value: str,

        path: str = "/encrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a encryption from a value to the config server."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

get

def get(
    self,
    key: str,
    default: Any = ''
) -> Any
View Source
    def get(self, key: str, default: Any = "") -> Any:

        return glom(self._config, key, default=default)

get_config

def get_config(
    self,
    **kwargs
) -> None

Request the configuration from the config server.

View Source
    def get_config(self, **kwargs) -> None:

        """Request the configuration from the config server."""

        kwargs = self._configure_oauth2(**kwargs)

        try:

            response = http.get(self.url, **kwargs)

        except Exception as err:

            logger.error(f"Failed to request: {self.url}")

            logger.error(err)

            if self.fail_fast:

                logger.info("fail_fast enabled. Terminating process.")

                raise SystemExit(1)

            raise ConnectionError("fail_fast disabled.")

        fconfig = [

            to_dict(config)

            for config in reversed(

                glom(response.json(), ("propertySources", ["source"]))

            )

        ]

        server_config: dict = {}

        [merge_dict(server_config, c) for c in fconfig]

        merge_dict(self._config, server_config)

get_config_async

def get_config_async(
    self,
    **kwargs
) -> None
View Source
    async def get_config_async(self, **kwargs) -> None:

        loop = asyncio.get_running_loop()

        await loop.run_in_executor(None, partial(self.get_config, **kwargs))

get_file

def get_file(
    self,
    filename: str,
    **kwargs: dict
) -> str

Request a file from the config server.

View Source
    def get_file(self, filename: str, **kwargs: dict) -> str:

        """Request a file from the config server."""

        uri = f"{self.address}/{self.app_name}/{self.profile}/{self.label}/{filename}"

        try:

            response = http.get(uri, **kwargs)

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {uri}")

        return response.text

keys

def keys(
    self
) -> KeysView
View Source
    def keys(self) -> KeysView:

        return self._config.keys()