Source code for config_sesame.util.vault

# -*- coding: utf-8 -*-
# pylint: disable=bad-continuation
""" Hashicorp Vault API (based on ``hvac``).
"""
# Copyright ©  2016 1&1 Group <jh@web.de>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, unicode_literals, print_function

import io
import os
import getpass

import hvac
from rudiments import security

from .._compat import urljoin

VAULT_TOKEN_FILE = '~/.vault-token'


[docs]def default_credentials(url=None, token=None): """Return default credentials from env / configuration in a tuple (url, user, token, auth_by).""" url = url or os.environ.get('VAULT_ADDR') # TODO: also add lookup from config / cmd line token = token or os.environ.get('VAULT_TOKEN') user, auth_by = getpass.getuser(), 'environment' token_file = os.path.expanduser(VAULT_TOKEN_FILE) if not token and os.path.exists(token_file): with io.open(token_file, encoding='utf-8') as handle: token = handle.readline().strip() auth_by = 'vault-token-file' return (url, user, token, auth_by)
[docs]def get_credentials(url=None, token=None): """Return active credentials in a tuple (url, user, token, auth_by).""" url, user, token, auth_by = default_credentials(url, token) if not token: access = security.Credentials(url) user, token = access.auth_pair() auth_by = access.source return (url, user, token, auth_by)
[docs]class APIWrapper(hvac.Client): """Wrapper for client API.""" last_url = None def _get(self, url, **kwargs): """Remember last accessed URL.""" self.last_url = urljoin(self._url, url) return hvac.Client._get(self, url, **kwargs)
[docs]class Connection(object): """Hashicorp Vault connection.""" def __init__(self, url=None, token=None): self.url, self.user, self.token, self.auth_by = get_credentials(url, token) self.api = APIWrapper(url=self.url, token=self.token)
[docs] def __str__(self): """Return human readable description of this connection.""" return "Connection to {} [authenticated by {}'s token from {}]".format( self.url, self.user, self.auth_by or 'N/A')