mirror of
https://github.com/eitchtee/WYGIWYH.git
synced 2026-07-04 20:11:45 +02:00
4273c541c5
- Personal API tokens (model, user-settings UI, admin, management command, DRF auth class) for non-interactive API access from automations like n8n. Raw token shown once; only a SHA-256 hash is stored; last_used_at writes are throttled. - OAuth2 authorization server via django-oauth-toolkit with authorization server metadata and optional, off-by-default Dynamic Client Registration (RFC 7591), so remote OAuth/MCP clients can authenticate and self-register. - Tests for token auth, DCR gating and the management commands, plus .env.example and README documentation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
254 lines
8.5 KiB
Python
254 lines
8.5 KiB
Python
import hmac
|
|
import json
|
|
import time
|
|
from secrets import token_urlsafe
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.http import JsonResponse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_http_methods
|
|
from oauth2_provider.models import get_application_model
|
|
|
|
|
|
Application = get_application_model()
|
|
|
|
SUPPORTED_TOKEN_ENDPOINT_AUTH_METHODS = {
|
|
"none": Application.CLIENT_PUBLIC,
|
|
"client_secret_basic": Application.CLIENT_CONFIDENTIAL,
|
|
"client_secret_post": Application.CLIENT_CONFIDENTIAL,
|
|
}
|
|
SUPPORTED_GRANT_TYPES = {"authorization_code", "refresh_token"}
|
|
SUPPORTED_RESPONSE_TYPES = {"code"}
|
|
|
|
|
|
def _base_url(request):
|
|
return settings.PUBLIC_BASE_URL or request.build_absolute_uri("/").rstrip("/")
|
|
|
|
|
|
def _json_error(error, error_description, status=400):
|
|
response = JsonResponse(
|
|
{"error": error, "error_description": error_description},
|
|
status=status,
|
|
)
|
|
response["Cache-Control"] = "no-store"
|
|
response["Pragma"] = "no-cache"
|
|
return response
|
|
|
|
|
|
def _set_no_store_headers(response):
|
|
response["Cache-Control"] = "no-store"
|
|
response["Pragma"] = "no-cache"
|
|
return response
|
|
|
|
|
|
def _parse_json_request_body(request):
|
|
try:
|
|
payload = json.loads(request.body.decode("utf-8"))
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
|
|
raise ValueError("Request body must be valid JSON.") from exc
|
|
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("Request body must be a JSON object.")
|
|
|
|
return payload
|
|
|
|
|
|
def _get_string_list(payload, field_name, *, required=False, default=None):
|
|
value = payload.get(field_name, default)
|
|
if value is None:
|
|
if required:
|
|
raise ValueError(f"'{field_name}' is required.")
|
|
return None
|
|
|
|
if not isinstance(value, list) or not value:
|
|
raise ValueError(f"'{field_name}' must be a non-empty array of strings.")
|
|
|
|
normalized = []
|
|
for item in value:
|
|
if not isinstance(item, str) or not item.strip():
|
|
raise ValueError(f"'{field_name}' must contain only non-empty strings.")
|
|
normalized.append(item.strip())
|
|
return normalized
|
|
|
|
|
|
def _get_supported_scopes():
|
|
return set(settings.OAUTH2_PROVIDER.get("SCOPES", {}).keys())
|
|
|
|
|
|
def _dcr_initial_access_token_ok(request):
|
|
"""Validate the optional RFC 7591 initial access token, if one is configured."""
|
|
expected = settings.OAUTH2_DCR_INITIAL_ACCESS_TOKEN
|
|
if not expected:
|
|
return True
|
|
|
|
header = request.META.get("HTTP_AUTHORIZATION", "")
|
|
scheme, _, value = header.partition(" ")
|
|
if scheme.lower() != "bearer" or not value:
|
|
return False
|
|
return hmac.compare_digest(value, expected)
|
|
|
|
|
|
@require_http_methods(["GET"])
|
|
def authorization_server_metadata(request):
|
|
base_url = _base_url(request)
|
|
metadata = {
|
|
"issuer": base_url,
|
|
"authorization_endpoint": f"{base_url}/oauth/authorize/",
|
|
"token_endpoint": f"{base_url}/oauth/token/",
|
|
"revocation_endpoint": f"{base_url}/oauth/revoke_token/",
|
|
"introspection_endpoint": f"{base_url}/oauth/introspect/",
|
|
"scopes_supported": sorted(settings.OAUTH2_PROVIDER["SCOPES"].keys()),
|
|
"response_types_supported": ["code"],
|
|
"grant_types_supported": ["authorization_code", "refresh_token"],
|
|
"token_endpoint_auth_methods_supported": [
|
|
"none",
|
|
"client_secret_basic",
|
|
"client_secret_post",
|
|
],
|
|
"code_challenge_methods_supported": ["S256"],
|
|
}
|
|
# Only advertise registration when DCR is actually enabled.
|
|
if settings.OAUTH2_DCR_ENABLED:
|
|
metadata["registration_endpoint"] = f"{base_url}/oauth/register/"
|
|
return JsonResponse(metadata)
|
|
|
|
|
|
@csrf_exempt
|
|
@require_http_methods(["POST"])
|
|
def dynamic_client_registration(request):
|
|
if not settings.OAUTH2_DCR_ENABLED:
|
|
return _json_error(
|
|
"not_found",
|
|
"Dynamic client registration is disabled.",
|
|
status=404,
|
|
)
|
|
|
|
if not _dcr_initial_access_token_ok(request):
|
|
return _json_error(
|
|
"invalid_token",
|
|
"A valid initial access token is required to register a client.",
|
|
status=401,
|
|
)
|
|
|
|
try:
|
|
payload = _parse_json_request_body(request)
|
|
redirect_uris = _get_string_list(payload, "redirect_uris", required=True)
|
|
grant_types = _get_string_list(
|
|
payload,
|
|
"grant_types",
|
|
default=["authorization_code"],
|
|
)
|
|
response_types = _get_string_list(
|
|
payload,
|
|
"response_types",
|
|
default=["code"],
|
|
)
|
|
except ValueError as exc:
|
|
return _json_error("invalid_client_metadata", str(exc))
|
|
|
|
unsupported_grant_types = sorted(set(grant_types) - SUPPORTED_GRANT_TYPES)
|
|
if unsupported_grant_types:
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"Unsupported grant_types: " + ", ".join(unsupported_grant_types),
|
|
)
|
|
|
|
if "authorization_code" not in grant_types:
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"grant_types must include 'authorization_code'.",
|
|
)
|
|
|
|
unsupported_response_types = sorted(set(response_types) - SUPPORTED_RESPONSE_TYPES)
|
|
if unsupported_response_types:
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"Unsupported response_types: "
|
|
+ ", ".join(unsupported_response_types),
|
|
)
|
|
|
|
if "code" not in response_types:
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"response_types must include 'code'.",
|
|
)
|
|
|
|
token_endpoint_auth_method = payload.get(
|
|
"token_endpoint_auth_method",
|
|
"client_secret_basic",
|
|
)
|
|
if token_endpoint_auth_method not in SUPPORTED_TOKEN_ENDPOINT_AUTH_METHODS:
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"Unsupported token_endpoint_auth_method: "
|
|
+ token_endpoint_auth_method,
|
|
)
|
|
|
|
supported_scopes = _get_supported_scopes()
|
|
raw_scope = payload.get("scope", "mcp")
|
|
if not isinstance(raw_scope, str):
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"'scope' must be a space-delimited string.",
|
|
)
|
|
requested_scope = raw_scope.strip() or "mcp"
|
|
requested_scopes = set(requested_scope.split())
|
|
unsupported_scopes = sorted(requested_scopes - supported_scopes)
|
|
if unsupported_scopes:
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"Unsupported scope values: " + ", ".join(unsupported_scopes),
|
|
)
|
|
|
|
client_name = str(payload.get("client_name", "Dynamic MCP Client")).strip()
|
|
if not client_name:
|
|
client_name = "Dynamic MCP Client"
|
|
|
|
client_secret = None
|
|
client_type = SUPPORTED_TOKEN_ENDPOINT_AUTH_METHODS[token_endpoint_auth_method]
|
|
if client_type == Application.CLIENT_CONFIDENTIAL:
|
|
client_secret = token_urlsafe(48)
|
|
|
|
application = Application(
|
|
name=client_name,
|
|
client_type=client_type,
|
|
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
|
|
redirect_uris=" ".join(redirect_uris),
|
|
skip_authorization=False,
|
|
hash_client_secret=True,
|
|
client_secret=client_secret or "",
|
|
)
|
|
|
|
try:
|
|
application.full_clean()
|
|
except ValidationError as exc:
|
|
errors = []
|
|
for field, messages in exc.message_dict.items():
|
|
errors.extend(f"{field}: {message}" for message in messages)
|
|
return _json_error(
|
|
"invalid_client_metadata",
|
|
"; ".join(errors),
|
|
)
|
|
|
|
application.save()
|
|
|
|
response_payload = {
|
|
"client_id": application.client_id,
|
|
"client_id_issued_at": int(time.time()),
|
|
"client_name": client_name,
|
|
"redirect_uris": redirect_uris,
|
|
# Report what was actually provisioned, not the raw request echo. The app
|
|
# is created with the authorization_code grant; refresh_token is implicit
|
|
# to that grant in django-oauth-toolkit rather than a separate capability.
|
|
"grant_types": sorted(set(grant_types) & SUPPORTED_GRANT_TYPES),
|
|
"response_types": sorted(set(response_types) & SUPPORTED_RESPONSE_TYPES),
|
|
"scope": " ".join(sorted(requested_scopes)),
|
|
"token_endpoint_auth_method": token_endpoint_auth_method,
|
|
}
|
|
if client_secret is not None:
|
|
response_payload["client_secret"] = client_secret
|
|
response_payload["client_secret_expires_at"] = 0
|
|
|
|
return _set_no_store_headers(JsonResponse(response_payload, status=201))
|