Source code for wauth.vault
"""Persistent storage layer for encrypted secrets using wsqlite."""
import os
from typing import Optional, Tuple
from ._log import _debug, _error, _info, _warning
from pydantic import BaseModel, Field
from wsqlite import WSQLite
from .exceptions import KeyNotFoundError, VaultError
[docs]
class SecretModel(BaseModel):
"""Pydantic model representing a stored secret.
Attributes:
key: Primary key — unique name of the secret.
value: Encrypted ciphertext of the secret.
type: Secret type, either ``"text"`` or ``"file"``.
created_at: Unix timestamp when the secret was first stored.
updated_at: Unix timestamp of the last modification.
ttl: Optional time-to-live in seconds. ``None`` means no expiration.
"""
key: str = Field(..., description="Primary Key - Unique name of the secret")
value: str = Field(..., description="Encrypted ciphertext value")
type: str = Field(default="text", description="Secret type: 'text' or 'file'")
created_at: float = Field(default=0.0, description="Unix timestamp of creation")
updated_at: float = Field(default=0.0, description="Unix timestamp of last update")
ttl: Optional[float] = Field(default=None, description="Time-to-live in seconds")
[docs]
class Vault:
"""Encrypted secret storage backed by a SQLite database.
Args:
db_path: Path to the SQLite database file.
Defaults to ``~/.wisrovi/wauth.db``.
"""
[docs]
def __init__(self, db_path: str = "~/.wisrovi/wauth.db") -> None:
self.db_path: str = os.path.expanduser(db_path)
os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True)
self.db: WSQLite = WSQLite(SecretModel, self.db_path)
_debug(f"Vault initialized at {self.db_path}")
def _timestamp(self) -> float:
"""Return the current Unix timestamp.
Returns:
Current time as a float (seconds since epoch).
"""
# pylint: disable=import-outside-toplevel
import time
return time.time()
[docs]
def save(
self,
key: str,
encrypted_value: str,
val_type: str = "text",
ttl: Optional[float] = None,
) -> None:
"""Save or update an encrypted secret in the vault.
Uses ``INSERT OR REPLACE`` semantics to upsert the secret
by key. Timestamps are automatically managed.
Args:
key: Unique identifier for the secret.
encrypted_value: Fernet-encrypted ciphertext string.
val_type: Secret type — ``"text"`` or ``"file"``.
ttl: Optional time-to-live in seconds.
"""
# pylint: disable=import-outside-toplevel
import time
now = self._timestamp()
secret = SecretModel(
key=key,
value=encrypted_value,
type=val_type,
created_at=now,
updated_at=now,
ttl=ttl,
)
table = self.db.table_name
fields = ", ".join(secret.model_dump().keys())
placeholders = ", ".join(["?" for _ in secret.model_dump()])
values = tuple(secret.model_dump().values())
query = f"INSERT OR REPLACE INTO {table} ({fields}) VALUES ({placeholders})" # nosec B608
try:
self.db._execute(query, values) # pylint: disable=protected-access
_debug(f"Secret saved: key='{key}', type='{val_type}'")
except Exception as exc:
_error(f"Failed to save secret '{key}': {exc}")
raise VaultError(f"Failed to save secret '{key}': {exc}") from exc
[docs]
def get(self, key: str) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve an encrypted secret by its key.
Args:
key: Unique identifier for the secret.
Returns:
A tuple of ``(encrypted_value, type)``.
Returns ``(None, None)`` if the key does not exist.
"""
try:
result = self.db.get_by_field(key=key)
except Exception as exc:
_error(f"Vault error retrieving key '{key}': {exc}")
raise VaultError(f"Failed to retrieve key '{key}': {exc}") from exc
if not result:
_debug(f"Key not found: '{key}'")
return (None, None)
secret_data = result[0]
# Check TTL expiration
if secret_data.ttl is not None and secret_data.ttl > 0:
# pylint: disable=import-outside-toplevel
import time
if time.time() - secret_data.updated_at > secret_data.ttl:
_warning(f"Secret expired: key='{key}'")
self.delete(key)
return (None, None)
_debug(f"Secret retrieved: key='{key}', type='{secret_data.type}'")
return (secret_data.value, secret_data.type)
[docs]
def delete(self, key: str) -> None:
"""Delete a secret from the vault.
Args:
key: Unique identifier for the secret to remove.
Raises:
KeyNotFoundError: If the key does not exist.
VaultError: If the delete operation fails.
"""
table = self.db.table_name
query = f"DELETE FROM {table} WHERE key = ?" # nosec B608
try:
rowcount = self.db._execute(query, (key,)) # pylint: disable=protected-access
if rowcount == 0:
raise KeyNotFoundError(f"Key not found: '{key}'")
_debug(f"Secret deleted: key='{key}'")
except KeyNotFoundError:
raise
except Exception as exc:
_error(f"Failed to delete key '{key}': {exc}")
raise VaultError(f"Failed to delete key '{key}': {exc}") from exc
[docs]
def list_keys(self) -> list[str]:
"""List all secret keys stored in the vault.
Returns:
A list of all key names.
"""
table = self.db.table_name
query = f"SELECT key FROM {table}" # nosec B608
try:
rows = self.db._execute(query) # pylint: disable=protected-access
keys = [row[0] for row in rows]
_debug(f"Listed {len(keys)} keys in vault")
return keys
except Exception as exc:
_error(f"Failed to list keys: {exc}")
raise VaultError(f"Failed to list keys: {exc}") from exc
[docs]
def count(self) -> int:
"""Return the number of secrets stored in the vault.
Returns:
Total number of secrets.
"""
table = self.db.table_name
query = f"SELECT COUNT(*) FROM {table}" # nosec B608
try:
result = self.db._execute(query) # pylint: disable=protected-access
count: int = result[0][0] if result else 0
_debug(f"Vault contains {count} secrets")
return count
except Exception as exc:
_error(f"Failed to count secrets: {exc}")
raise VaultError(f"Failed to count secrets: {exc}") from exc