mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-27 03:09:31 +02:00
181 lines
5.9 KiB
Python
181 lines
5.9 KiB
Python
import importlib
|
|
import types
|
|
from pathlib import Path
|
|
|
|
from django.core.exceptions import ImproperlyConfigured, SuspiciousFileOperation
|
|
from django.core.files.storage import Storage, default_storage
|
|
from django.core.files.utils import validate_file_name
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from taggit.managers import _TaggableManager
|
|
|
|
from netbox.context import current_request
|
|
|
|
from .constants import IMAGE_ATTACHMENT_IMAGE_FORMATS
|
|
from .validators import CustomValidator
|
|
|
|
__all__ = (
|
|
'SharedObjectViewMixin',
|
|
'filename_from_model',
|
|
'image_upload',
|
|
'is_report',
|
|
'is_script',
|
|
'is_taggable',
|
|
'run_validators',
|
|
'validate_script_content',
|
|
)
|
|
|
|
|
|
class SharedObjectViewMixin:
|
|
|
|
def get_queryset(self, request):
|
|
"""
|
|
Return only shared objects, or those owned by the current user, unless this is a superuser.
|
|
"""
|
|
queryset = super().get_queryset(request)
|
|
if request.user.is_superuser:
|
|
return queryset
|
|
if request.user.is_anonymous:
|
|
return queryset.filter(shared=True)
|
|
return queryset.filter(
|
|
Q(shared=True) | Q(user=request.user)
|
|
)
|
|
|
|
|
|
def filename_from_model(model: models.Model) -> str:
|
|
"""Standardizes how we generate filenames from model class for exports"""
|
|
base = model._meta.verbose_name_plural.lower().replace(' ', '_')
|
|
return f'netbox_{base}'
|
|
|
|
|
|
def filename_from_object(context: dict) -> str:
|
|
"""Standardizes how we generate filenames from model class for exports"""
|
|
if 'device' in context:
|
|
base = f"{context['device'].name or 'config'}"
|
|
elif 'virtualmachine' in context:
|
|
base = f"{context['virtualmachine'].name or 'config'}"
|
|
else:
|
|
base = 'config'
|
|
return base
|
|
|
|
|
|
def is_taggable(obj):
|
|
"""
|
|
Return True if the instance can have Tags assigned to it; False otherwise.
|
|
"""
|
|
if hasattr(obj, 'tags'):
|
|
if issubclass(obj.tags.__class__, _TaggableManager):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _build_image_attachment_path(instance, filename, *, storage=default_storage):
|
|
"""
|
|
Build a deterministic relative path for an image attachment.
|
|
|
|
- Normalizes browser paths (e.g., C:\\fake_path\\photo.jpg)
|
|
- Uses the instance.name if provided (sanitized to a *basename*, no ext)
|
|
- Prefixes with a machine-friendly identifier
|
|
"""
|
|
upload_dir = 'image-attachments'
|
|
default_filename = 'unnamed'
|
|
allowed_img_extensions = IMAGE_ATTACHMENT_IMAGE_FORMATS.keys()
|
|
|
|
# Normalize Windows paths and create a Path object.
|
|
normalized_filename = str(filename).replace('\\', '/')
|
|
file_path = Path(normalized_filename)
|
|
|
|
# Extract the extension from the uploaded file.
|
|
ext = file_path.suffix.lower().lstrip('.')
|
|
|
|
# Use the instance-provided name if available; otherwise use the file stem.
|
|
# Rely on Django's get_valid_filename to perform sanitization.
|
|
stem = (instance.name or file_path.stem).strip()
|
|
try:
|
|
safe_stem = storage.get_valid_name(stem)
|
|
except SuspiciousFileOperation:
|
|
safe_stem = default_filename
|
|
|
|
# Append the uploaded extension only if it's an allowed image type
|
|
final_name = f'{safe_stem}.{ext}' if ext in allowed_img_extensions else safe_stem
|
|
|
|
# Create a machine-friendly prefix from the instance
|
|
prefix = f'{instance.object_type.model}_{instance.object_id}'
|
|
name_with_path = f'{upload_dir}/{prefix}_{final_name}'
|
|
|
|
# Validate the generated relative path (blocks absolute/traversal)
|
|
validate_file_name(name_with_path, allow_relative_path=True)
|
|
return name_with_path
|
|
|
|
|
|
def image_upload(instance, filename):
|
|
"""
|
|
Return a relative upload path for an image attachment, applying Django's
|
|
usual suffix-on-collision behavior regardless of storage backend.
|
|
"""
|
|
field = instance.image.field
|
|
name_with_path = _build_image_attachment_path(instance, filename, storage=field.storage)
|
|
|
|
# Intentionally call Django's base Storage implementation here. Some
|
|
# backends override get_available_name() to reuse the incoming name
|
|
# unchanged, but we want Django's normal suffix-on-collision behavior
|
|
# while still dispatching exists() / get_alternative_name() to the
|
|
# configured storage instance.
|
|
return Storage.get_available_name(field.storage, name_with_path, max_length=field.max_length)
|
|
|
|
|
|
def is_script(obj):
|
|
"""
|
|
Returns True if the object is a Script or Report.
|
|
"""
|
|
from .reports import Report
|
|
from .scripts import Script
|
|
try:
|
|
return (issubclass(obj, Report) and obj != Report) or (issubclass(obj, Script) and obj != Script)
|
|
except TypeError:
|
|
return False
|
|
|
|
|
|
def validate_script_content(content, filename):
|
|
"""
|
|
Validate that the given content can be loaded as a Python module by compiling
|
|
and executing it. Raises an exception if the script cannot be loaded.
|
|
"""
|
|
code = compile(content, filename, 'exec')
|
|
module_name = Path(filename).stem
|
|
module = types.ModuleType(module_name)
|
|
exec(code, module.__dict__)
|
|
|
|
|
|
def is_report(obj):
|
|
"""
|
|
Returns True if the given object is a Report.
|
|
"""
|
|
from .reports import Report
|
|
try:
|
|
return issubclass(obj, Report) and obj != Report
|
|
except TypeError:
|
|
return False
|
|
|
|
|
|
def run_validators(instance, validators):
|
|
"""
|
|
Run the provided iterable of CustomValidators for the instance.
|
|
"""
|
|
request = current_request.get()
|
|
for validator in validators:
|
|
|
|
# Loading a validator class by a dotted path
|
|
if type(validator) is str:
|
|
module, cls = validator.rsplit('.', 1)
|
|
validator = getattr(importlib.import_module(module), cls)()
|
|
|
|
# Constructing a new instance on the fly from a ruleset
|
|
elif type(validator) is dict:
|
|
validator = CustomValidator(validator)
|
|
|
|
elif not issubclass(validator.__class__, CustomValidator):
|
|
raise ImproperlyConfigured(f"Invalid value for custom validator: {validator}")
|
|
|
|
validator(instance, request)
|