Source code for wauth.drivers.local

"""Local driver for encrypted secret storage.

Handles encryption/decryption of secrets and persistence via the Vault.
"""

import hmac

# pylint: disable=import-outside-toplevel
from typing import Optional

from .._log import _debug, _error, _info, _warning

from ..core import CryptoEngine
from ..vault import Vault


[docs] class LocalDriver: """Driver that stores secrets locally using Fernet encryption. Combines the cryptographic engine with the SQLite-backed vault for persistent, machine-locked secret storage. Args: custom_key: Optional custom encryption key string. """
[docs] def __init__(self, custom_key: Optional[str] = None) -> None: self.engine: CryptoEngine = CryptoEngine(custom_key=custom_key) self.vault: Vault = Vault() _debug("LocalDriver initialized")
[docs] def set_secret( self, key: str, value: str, ttl: Optional[float] = None ) -> None: """Encrypt and store a text secret. Args: key: Unique identifier for the secret. value: Plaintext value to encrypt. ttl: Optional time-to-live in seconds. ``None`` means no expiration. """ encrypted = self.engine.encrypt(value.encode()) self.vault.save(key, encrypted, "text", ttl=ttl) _info(f"Text secret stored: key='{key}'")
[docs] def set_file( self, key: str, file_path: str, ttl: Optional[float] = None ) -> None: """Encrypt and store a file's contents. Args: key: Unique identifier for the file secret. file_path: Path to the file to encrypt and store. ttl: Optional time-to-live in seconds. """ with open(file_path, "rb") as f: encrypted = self.engine.encrypt(f.read()) self.vault.save(key, encrypted, "file", ttl=ttl) _info(f"File secret stored: key='{key}'")
[docs] def get_secret(self, key: str) -> str | bytes | None: """Retrieve and decrypt a secret by its key. Args: key: Unique identifier for the secret. Returns: Decrypted secret as ``str`` for text type or ``bytes`` for file type. Returns ``None`` if the key does not exist. """ encrypted, v_type = self.vault.get(key) if not encrypted: _debug(f"Secret not found: key='{key}'") return None decrypted = self.engine.decrypt(encrypted) _debug(f"Secret decrypted: key='{key}', type='{v_type}'") return decrypted.decode() if v_type == "text" else decrypted
[docs] def delete_secret(self, key: str) -> None: """Delete a secret from the vault. Args: key: Unique identifier for the secret to remove. Raises: KeyNotFoundError: If the key does not exist. """ self.vault.delete(key) _info(f"Secret deleted: key='{key}'")
[docs] def list_keys(self) -> list[str]: """List all secret keys stored in the vault. Returns: A list of all key names. """ return self.vault.list_keys()
[docs] def rotate_key( self, new_custom_key: str, keys_to_migrate: Optional[list[str]] = None ) -> dict[str, bool]: """Rotate the encryption key and re-encrypt existing secrets. Creates a new CryptoEngine with the provided key, decrypts all existing secrets with the current engine, and re-encrypts them with the new engine. Args: new_custom_key: The new custom key to use for encryption. keys_to_migrate: Specific keys to migrate. If ``None``, all keys in the vault are migrated. Returns: A dictionary mapping each key to a boolean indicating success (``True``) or failure (``False``). """ new_engine = CryptoEngine(custom_key=new_custom_key) target_keys = keys_to_migrate or self.list_keys() results: dict[str, bool] = {} for secret_key in target_keys: try: encrypted, v_type = self.vault.get(secret_key) if not encrypted: results[secret_key] = False continue # Decrypt with old engine plaintext = self.engine.decrypt(encrypted) # Re-encrypt with new engine new_encrypted = new_engine.encrypt(plaintext) self.vault.save(secret_key, new_encrypted, v_type) results[secret_key] = True _info(f"Key migrated: '{secret_key}'") except Exception as exc: # noqa: BLE001, pylint: disable=broad-exception-caught _error(f"Failed to migrate key '{secret_key}': {exc}") results[secret_key] = False if all(results.values()): # Swap the engine after successful migration self.engine = new_engine _info("Encryption key rotated successfully") else: _warning("Partial key rotation — some keys failed to migrate") return results
[docs] def valid_secret(self, key: str, value_to_check: str) -> bool: """Verify if stored secret matches provided value without exposing it. Unlike get_secret(), this method never returns the decrypted secret. Uses constant-time comparison to prevent timing attacks. Args: key: Unique identifier for the secret. value_to_check: Plaintext value to compare against stored secret. Returns: True if values match, False otherwise (or if key doesn't exist). """ encrypted, v_type = self.vault.get(key) if not encrypted or v_type != "text": _debug(f"Secret not found or not text: key='{key}'") return False try: decrypted = self.engine.decrypt(encrypted).decode() # Constant-time comparison to prevent timing attacks return hmac.compare_digest(decrypted, value_to_check) except Exception: # noqa: BLE001, pylint: disable=broad-exception-caught _error(f"Failed to verify secret: key='{key}'") return False