Skip to content

Module config

None

None

View Source
from .auth import OAuth2

from .cf import CF

from .cfenv import CFenv

from .spring import ConfigClient, config_client, create_config_client

__version__ = "1.1.0"

__all__ = [

    "__version__",

    "ConfigClient",

    "CFenv",

    "CF",

    "OAuth2",

    "create_config_client",

    "config_client",

]

Sub-modules

Variables

__version__

Functions

config_client

def config_client(
    **kwargs
) -> Callable[[Dict[str, str]], config.spring.ConfigClient]

ConfigClient decorator.

Usage:

@config_client(app_name='test') def get_config(config): db_user = config.get_attribute('database.user')

Returns:

Type Description
None ConfigClient instance.

Raises:

Type Description
None ConnectionError: If fail_fast enabled.
View Source
def config_client(**kwargs) -> Callable[[Dict[str, str]], ConfigClient]:

    """ConfigClient decorator.

    Usage:

    @config_client(app_name='test')

    def get_config(config):

        db_user = config.get_attribute('database.user')

    :raises: ConnectionError: If fail_fast enabled.

    :return: ConfigClient instance.

    """

    instance_params, get_config_params = __get_params(**kwargs)

    def wrap_function(function):

        logger.debug(f"caller: [name='{function.__name__}']")

        @wraps(function)

        def enable_config():

            obj = ConfigClient(**instance_params)

            obj.get_config(**get_config_params)

            return function(obj)

        return enable_config

    return wrap_function

create_config_client

def create_config_client(
    **kwargs
) -> config.spring.ConfigClient

Create ConfigClient singleton instance.

Parameters:

Name Type Description Default
address None None
app_name None None
branch None None
fail_fast None None
profile None None
url None None

Returns:

Type Description
None ConfigClient instance.
View Source
@singleton

def create_config_client(**kwargs) -> ConfigClient:

    """

    Create ConfigClient singleton instance.

    :param address:

    :param app_name:

    :param branch:

    :param fail_fast:

    :param profile:

    :param url:

    :return: ConfigClient instance.

    """

    instance_params, get_config_params = __get_params(**kwargs)

    obj = ConfigClient(**instance_params)

    obj.get_config(**get_config_params)

    return obj

Classes

CF

class CF(
    cfenv: config.cfenv.CFenv = NOTHING,
    oauth2: config.auth.OAuth2 = None,
    client: config.spring.ConfigClient = None
)
View Source
@mutable

class CF:

    cfenv: CFenv = field(

        factory=CFenv,

        validator=validators.instance_of(CFenv),

    )

    oauth2: OAuth2 = field(default=None)

    client: ConfigClient = field(default=None)

    def __attrs_post_init__(self) -> None:

        if not self.oauth2:

            self.oauth2 = OAuth2(

                access_token_uri=self.cfenv.configserver_access_token_uri(),

                client_id=self.cfenv.configserver_client_id(),

                client_secret=self.cfenv.configserver_client_secret(),

            )

        if not self.client:

            self.client = ConfigClient(

                address=self.cfenv.configserver_uri(),

                app_name=self.cfenv.application_name,

                profile=self.cfenv.space_name.lower(),

                oauth2=self.oauth2,

            )

    @property

    def vcap_services(self):

        return self.cfenv.vcap_services

    @property

    def vcap_application(self):

        return self.cfenv.vcap_application

    def get_config(self, **kwargs) -> None:

        self.client.get_config(**kwargs)

    async def get_config_async(self, **kwargs) -> None:

        await self.client.get_config_async(**kwargs)

    @property

    def config(self) -> Dict:

        return self.client.config

    def get(self, key, default: Any = ""):

        return self.client.get(key, default)

    def keys(self) -> KeysView:

        return self.client.keys()

Instance variables

cfenv
client
config
oauth2
vcap_application
vcap_services

Methods

get

def get(
    self,
    key,
    default: Any = ''
)
View Source
    def get(self, key, default: Any = ""):

        return self.client.get(key, default)

get_config

def get_config(
    self,
    **kwargs
) -> None
View Source
    def get_config(self, **kwargs) -> None:

        self.client.get_config(**kwargs)

get_config_async

def get_config_async(
    self,
    **kwargs
) -> None
View Source
    async def get_config_async(self, **kwargs) -> None:

        await self.client.get_config_async(**kwargs)

keys

def keys(
    self
) -> KeysView
View Source
    def keys(self) -> KeysView:

        return self.client.keys()

CFenv

class CFenv(
    vcap_service_prefix: str = 'p-config-server',
    vcap_application: dict = {'application_name': '', 'space_name': '', 'organization_name': '', 'uris': []},
    vcap_services: dict = {'p-config-server': [{'credentials': {'uri': '', 'access_token_uri': '', 'client_id': '', 'client_secret': ''}}]}
)
View Source
@mutable

class CFenv:

    vcap_service_prefix: str = field(

        default=os.getenv("VCAP_SERVICE_PREFIX", "p-config-server"),

        validator=validators.instance_of(str),

    )

    vcap_application: dict = field(

        default=json.loads(os.getenv("VCAP_APPLICATION", default_vcap_application)),

        validator=validators.instance_of(dict),

    )

    vcap_services: dict = field(

        default=json.loads(os.getenv("VCAP_SERVICES", default_vcap_services)),

        validator=validators.instance_of(dict),

    )

    def __attrs_post_init__(self) -> None:

        if self.vcap_service_prefix not in self.vcap_services.keys():

            vcap_services_copy = self.vcap_services.copy()

            vcap_services_copy[self.vcap_service_prefix] = vcap_services_copy.pop(

                "p-config-server"

            )

            self.vcap_services = vcap_services_copy

    @property

    def space_name(self) -> Any:

        return glom(self.vcap_application, "space_name", default="")

    @property

    def organization_name(self) -> Any:

        return glom(self.vcap_application, "organization_name", default="")

    @property

    def application_name(self) -> Any:

        return glom(self.vcap_application, "application_name", default="")

    @property

    def uris(self) -> Any:

        return glom(self.vcap_application, "uris", default=[])

    def configserver_uri(

        self, vcap_path: str = "0.credentials.uri", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

    def configserver_access_token_uri(

        self, vcap_path: str = "0.credentials.access_token_uri", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

    def configserver_client_id(

        self, vcap_path: str = "0.credentials.client_id", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

    def configserver_client_secret(

        self, vcap_path: str = "0.credentials.client_secret", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

    def _format_vcap_path(self, path: str) -> Path:

        subpath = path.split(".")

        return Path(Path(self.vcap_service_prefix), *subpath)

Instance variables

application_name
organization_name
space_name
uris
vcap_application
vcap_service_prefix
vcap_services

Methods

configserver_access_token_uri

def configserver_access_token_uri(
    self,
    vcap_path: str = '0.credentials.access_token_uri',
    default: Any = ''
) -> Any
View Source
    def configserver_access_token_uri(

        self, vcap_path: str = "0.credentials.access_token_uri", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

configserver_client_id

def configserver_client_id(
    self,
    vcap_path: str = '0.credentials.client_id',
    default: Any = ''
) -> Any
View Source
    def configserver_client_id(

        self, vcap_path: str = "0.credentials.client_id", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

configserver_client_secret

def configserver_client_secret(
    self,
    vcap_path: str = '0.credentials.client_secret',
    default: Any = ''
) -> Any
View Source
    def configserver_client_secret(

        self, vcap_path: str = "0.credentials.client_secret", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

configserver_uri

def configserver_uri(
    self,
    vcap_path: str = '0.credentials.uri',
    default: Any = ''
) -> Any
View Source
    def configserver_uri(

        self, vcap_path: str = "0.credentials.uri", default: Any = ""

    ) -> Any:

        path = self._format_vcap_path(vcap_path)

        return glom(self.vcap_services, path, default=default)

ConfigClient

class ConfigClient(
    address: str = 'http://localhost:8888',
    label: str = 'master',
    app_name: str = '',
    profile: str = 'development',
    fail_fast=True,
    oauth2: Optional[config.auth.OAuth2] = None
)
View Source
@mutable

class ConfigClient:

    """Spring Cloud Config Client."""

    address: str = field(

        default=os.getenv("CONFIGSERVER_ADDRESS", "http://localhost:8888"),

        validator=validators.instance_of(str),

    )

    label: str = field(

        default=os.getenv("LABEL", "master"),

        validator=validators.instance_of(str),

    )

    app_name: str = field(

        default=os.getenv("APP_NAME", ""),

        validator=validators.instance_of(str),

    )

    profile: str = field(

        default=os.getenv("PROFILE", "development"),

        validator=validators.instance_of(str),

    )

    fail_fast: bool = field(

        default=bool(strtobool(str(os.getenv("CONFIG_FAIL_FAST", True)))),

        validator=validators.instance_of(bool),

        converter=bool,

    )

    oauth2: Optional[OAuth2] = field(

        default=None,

        validator=validators.optional(validators.instance_of(OAuth2)),

    )

    _config: dict = field(

        factory=dict,

        init=False,

        validator=validators.instance_of(dict),

        repr=False,

    )

    @property

    def url(self) -> str:

        return f"{self.address}/{self.app_name}/{self.profile}/{self.label}"

    def get_config(self, **kwargs) -> None:

        """Request the configuration from the config server."""

        kwargs = self._configure_oauth2(**kwargs)

        try:

            response = http.get(self.url, **kwargs)

        except Exception as err:

            logger.error(f"Failed to request: {self.url}")

            logger.error(err)

            if self.fail_fast:

                logger.info("fail_fast enabled. Terminating process.")

                raise SystemExit(1)

            raise ConnectionError("fail_fast disabled.")

        fconfig = [

            to_dict(config)

            for config in reversed(

                glom(response.json(), ("propertySources", ["source"]))

            )

        ]

        server_config: dict = {}

        [merge_dict(server_config, c) for c in fconfig]

        merge_dict(self._config, server_config)

    async def get_config_async(self, **kwargs) -> None:

        loop = asyncio.get_running_loop()

        await loop.run_in_executor(None, partial(self.get_config, **kwargs))

    def _configure_oauth2(self, **kwargs) -> dict:

        if self.oauth2:

            self.oauth2.configure(**kwargs)

            try:

                kwargs["headers"].update(self.oauth2.authorization_header)

            except KeyError:

                kwargs.update(dict(headers=self.oauth2.authorization_header))

        return kwargs

    def get_file(self, filename: str, **kwargs: dict) -> str:

        """Request a file from the config server."""

        uri = f"{self.address}/{self.app_name}/{self.profile}/{self.label}/{filename}"

        try:

            response = http.get(uri, **kwargs)

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {uri}")

        return response.text

    def encrypt(

        self,

        value: str,

        path: str = "/encrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a encryption from a value to the config server."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

    def decrypt(

        self,

        value: str,

        path: str = "/decrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a decryption from a value to the config server.."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

    @property

    def config(self) -> Dict:

        """Getter from configurations retrieved from ConfigClient."""

        return self._config

    def get(self, key: str, default: Any = "") -> Any:

        return glom(self._config, key, default=default)

    def keys(self) -> KeysView:

        return self._config.keys()

Instance variables

address
app_name
config

Getter from configurations retrieved from ConfigClient.

fail_fast
label
oauth2
profile
url

Methods

decrypt

def decrypt(
    self,
    value: str,
    path: str = '/decrypt',
    headers: dict = {'Content-Type': 'text/plain'},
    **kwargs: dict
) -> str

Request a decryption from a value to the config server..

View Source
    def decrypt(

        self,

        value: str,

        path: str = "/decrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a decryption from a value to the config server.."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

encrypt

def encrypt(
    self,
    value: str,
    path: str = '/encrypt',
    headers: dict = {'Content-Type': 'text/plain'},
    **kwargs: dict
) -> str

Request a encryption from a value to the config server.

View Source
    def encrypt(

        self,

        value: str,

        path: str = "/encrypt",

        headers: dict = {"Content-Type": "text/plain"},

        **kwargs: dict,

    ) -> str:

        """Request a encryption from a value to the config server."""

        try:

            response = http.post(

                uri=f"{self.address}{path}", data=value, headers=headers, **kwargs

            )

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {self.address}{path}")

        return response.text

get

def get(
    self,
    key: str,
    default: Any = ''
) -> Any
View Source
    def get(self, key: str, default: Any = "") -> Any:

        return glom(self._config, key, default=default)

get_config

def get_config(
    self,
    **kwargs
) -> None

Request the configuration from the config server.

View Source
    def get_config(self, **kwargs) -> None:

        """Request the configuration from the config server."""

        kwargs = self._configure_oauth2(**kwargs)

        try:

            response = http.get(self.url, **kwargs)

        except Exception as err:

            logger.error(f"Failed to request: {self.url}")

            logger.error(err)

            if self.fail_fast:

                logger.info("fail_fast enabled. Terminating process.")

                raise SystemExit(1)

            raise ConnectionError("fail_fast disabled.")

        fconfig = [

            to_dict(config)

            for config in reversed(

                glom(response.json(), ("propertySources", ["source"]))

            )

        ]

        server_config: dict = {}

        [merge_dict(server_config, c) for c in fconfig]

        merge_dict(self._config, server_config)

get_config_async

def get_config_async(
    self,
    **kwargs
) -> None
View Source
    async def get_config_async(self, **kwargs) -> None:

        loop = asyncio.get_running_loop()

        await loop.run_in_executor(None, partial(self.get_config, **kwargs))

get_file

def get_file(
    self,
    filename: str,
    **kwargs: dict
) -> str

Request a file from the config server.

View Source
    def get_file(self, filename: str, **kwargs: dict) -> str:

        """Request a file from the config server."""

        uri = f"{self.address}/{self.app_name}/{self.profile}/{self.label}/{filename}"

        try:

            response = http.get(uri, **kwargs)

        except Exception:

            raise RequestFailedException(f"Failed to request URI: {uri}")

        return response.text

keys

def keys(
    self
) -> KeysView
View Source
    def keys(self) -> KeysView:

        return self._config.keys()

OAuth2

class OAuth2(
    access_token_uri: str,
    client_id: str,
    client_secret: str,
    grant_type: str = 'client_credentials',
    token: str = NOTHING
)
View Source
@mutable

class OAuth2:

    access_token_uri: str = field(validator=validators.instance_of(str))

    client_id: str = field(validator=validators.instance_of(str))

    client_secret: str = field(validator=validators.instance_of(str))

    grant_type: str = field(

        default="client_credentials",

        validator=validators.instance_of(str),

    )

    _token: str = field(factory=str, validator=validators.instance_of(str), repr=False)

    @property

    def token(self) -> str:

        return self._token

    @token.setter

    def token(self, value) -> None:

        self._token = value

        logger.debug(f"set: [access_token='{self._token}']")

    @property

    def authorization_header(self) -> dict:

        return {"Authorization": f"Bearer {self.token}"}

    def request_token(self, client_auth: HTTPBasicAuth, data: dict, **kwargs) -> None:

        try:

            response = http.post(

                self.access_token_uri, auth=client_auth, data=data, **kwargs

            )

        except MissingSchema:

            raise RequestFailedException("Access token URL it's empty")

        except HTTPError:

            raise RequestTokenException("Failed to retrieve oauth2 access_token.")

        self.token = response.json().get("access_token")

        logger.info("Access token successfully obtained.")

    def configure(self, **kwargs) -> None:

        client_auth = HTTPBasicAuth(self.client_id, self.client_secret)

        data = {"grant_type": f"{self.grant_type}"}

        self.request_token(client_auth, data, **kwargs)

Instance variables

access_token_uri
authorization_header
client_id
client_secret
grant_type
token

Methods

configure

def configure(
    self,
    **kwargs
) -> None
View Source
    def configure(self, **kwargs) -> None:

        client_auth = HTTPBasicAuth(self.client_id, self.client_secret)

        data = {"grant_type": f"{self.grant_type}"}

        self.request_token(client_auth, data, **kwargs)

request_token

def request_token(
    self,
    client_auth: requests.auth.HTTPBasicAuth,
    data: dict,
    **kwargs
) -> None
View Source
    def request_token(self, client_auth: HTTPBasicAuth, data: dict, **kwargs) -> None:

        try:

            response = http.post(

                self.access_token_uri, auth=client_auth, data=data, **kwargs

            )

        except MissingSchema:

            raise RequestFailedException("Access token URL it's empty")

        except HTTPError:

            raise RequestTokenException("Failed to retrieve oauth2 access_token.")

        self.token = response.json().get("access_token")

        logger.info("Access token successfully obtained.")