mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-10 11:23:58 +02:00
Merge pull request #21863 from netbox-community/21801-duplicate-filename-allowed-when-upload-files-using-s3
Fixes #21801: Ensure unique Image Attachment filenames when using S3 storage
This commit is contained in:
@@ -1,10 +1,15 @@
|
||||
import io
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files.base import ContentFile
|
||||
from django.core.files.storage import Storage
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.forms import ValidationError
|
||||
from django.test import TestCase, tag
|
||||
from PIL import Image
|
||||
|
||||
from core.models import AutoSyncRecord, DataSource, ObjectType
|
||||
from dcim.models import Device, DeviceRole, DeviceType, Location, Manufacturer, Platform, Region, Site, SiteGroup
|
||||
@@ -14,10 +19,50 @@ from utilities.exceptions import AbortRequest
|
||||
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
|
||||
|
||||
|
||||
class OverwriteStyleMemoryStorage(Storage):
|
||||
"""
|
||||
In-memory storage that mimics overwrite-style backends by returning the
|
||||
incoming name unchanged from get_available_name().
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.files = {}
|
||||
|
||||
def _open(self, name, mode='rb'):
|
||||
return ContentFile(self.files[name], name=name)
|
||||
|
||||
def _save(self, name, content):
|
||||
self.files[name] = content.read()
|
||||
return name
|
||||
|
||||
def delete(self, name):
|
||||
self.files.pop(name, None)
|
||||
|
||||
def exists(self, name):
|
||||
return name in self.files
|
||||
|
||||
def get_available_name(self, name, max_length=None):
|
||||
return name
|
||||
|
||||
def get_alternative_name(self, file_root, file_ext):
|
||||
return f'{file_root}_sdmmer4{file_ext}'
|
||||
|
||||
def listdir(self, path):
|
||||
return [], list(self.files)
|
||||
|
||||
def size(self, name):
|
||||
return len(self.files[name])
|
||||
|
||||
def url(self, name):
|
||||
return f'https://example.invalid/{name}'
|
||||
|
||||
|
||||
class ImageAttachmentTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
cls.ct_rack = ContentType.objects.get_by_natural_key('dcim', 'rack')
|
||||
cls.ct_site = ContentType.objects.get_by_natural_key('dcim', 'site')
|
||||
cls.site = Site.objects.create(name='Site 1')
|
||||
cls.image_content = b''
|
||||
|
||||
def _stub_image_attachment(self, object_id, image_filename, name=None):
|
||||
@@ -41,6 +86,15 @@ class ImageAttachmentTests(TestCase):
|
||||
)
|
||||
return ia
|
||||
|
||||
def _uploaded_png(self, filename):
|
||||
image = io.BytesIO()
|
||||
Image.new('RGB', (1, 1)).save(image, format='PNG')
|
||||
return SimpleUploadedFile(
|
||||
name=filename,
|
||||
content=image.getvalue(),
|
||||
content_type='image/png',
|
||||
)
|
||||
|
||||
def test_filename_strips_expected_prefix(self):
|
||||
"""
|
||||
Tests that the filename of the image attachment is stripped of the expected
|
||||
@@ -89,6 +143,37 @@ class ImageAttachmentTests(TestCase):
|
||||
ia = self._stub_image_attachment(12, 'image-attachments/rack_12_file.png', name='')
|
||||
self.assertEqual('file.png', str(ia))
|
||||
|
||||
def test_duplicate_uploaded_names_get_suffixed_with_overwrite_style_storage(self):
|
||||
storage = OverwriteStyleMemoryStorage()
|
||||
field = ImageAttachment._meta.get_field('image')
|
||||
|
||||
with patch.object(field, 'storage', storage):
|
||||
first = ImageAttachment(
|
||||
object_type=self.ct_site,
|
||||
object_id=self.site.pk,
|
||||
image=self._uploaded_png('action-buttons.png'),
|
||||
)
|
||||
first.save()
|
||||
|
||||
second = ImageAttachment(
|
||||
object_type=self.ct_site,
|
||||
object_id=self.site.pk,
|
||||
image=self._uploaded_png('action-buttons.png'),
|
||||
)
|
||||
second.save()
|
||||
|
||||
base_name = f'image-attachments/site_{self.site.pk}_action-buttons.png'
|
||||
suffixed_name = f'image-attachments/site_{self.site.pk}_action-buttons_sdmmer4.png'
|
||||
|
||||
self.assertEqual(first.image.name, base_name)
|
||||
self.assertEqual(second.image.name, suffixed_name)
|
||||
self.assertNotEqual(first.image.name, second.image.name)
|
||||
|
||||
self.assertEqual(first.filename, 'action-buttons.png')
|
||||
self.assertEqual(second.filename, 'action-buttons_sdmmer4.png')
|
||||
|
||||
self.assertCountEqual(storage.files.keys(), {base_name, suffixed_name})
|
||||
|
||||
|
||||
class TagTest(TestCase):
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files.storage import Storage
|
||||
from django.test import TestCase
|
||||
|
||||
from extras.models import ExportTemplate
|
||||
from extras.utils import filename_from_model, image_upload
|
||||
from extras.models import ExportTemplate, ImageAttachment
|
||||
from extras.utils import _build_image_attachment_path, filename_from_model, image_upload
|
||||
from tenancy.models import ContactGroup, TenantGroup
|
||||
from wireless.models import WirelessLANGroup
|
||||
|
||||
@@ -22,6 +24,25 @@ class FilenameFromModelTests(TestCase):
|
||||
self.assertEqual(filename_from_model(model), expected)
|
||||
|
||||
|
||||
class OverwriteStyleStorage(Storage):
|
||||
"""
|
||||
Mimic an overwrite-style backend (for example, S3 with file_overwrite=True),
|
||||
where get_available_name() returns the incoming name unchanged.
|
||||
"""
|
||||
|
||||
def __init__(self, existing_names=None):
|
||||
self.existing_names = set(existing_names or [])
|
||||
|
||||
def exists(self, name):
|
||||
return name in self.existing_names
|
||||
|
||||
def get_available_name(self, name, max_length=None):
|
||||
return name
|
||||
|
||||
def get_alternative_name(self, file_root, file_ext):
|
||||
return f'{file_root}_sdmmer4{file_ext}'
|
||||
|
||||
|
||||
class ImageUploadTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
@@ -31,16 +52,18 @@ class ImageUploadTests(TestCase):
|
||||
|
||||
def _stub_instance(self, object_id=12, name=None):
|
||||
"""
|
||||
Creates a minimal stub for use with the `image_upload()` function.
|
||||
|
||||
This method generates an instance of `SimpleNamespace` containing a set
|
||||
of attributes required to simulate the expected input for the
|
||||
`image_upload()` method.
|
||||
It is designed to simplify testing or processing by providing a
|
||||
lightweight representation of an object.
|
||||
Creates a minimal stub for use with image attachment path generation.
|
||||
"""
|
||||
return SimpleNamespace(object_type=self.ct_rack, object_id=object_id, name=name)
|
||||
|
||||
def _bound_instance(self, *, storage, object_id=12, name=None, max_length=100):
|
||||
return SimpleNamespace(
|
||||
object_type=self.ct_rack,
|
||||
object_id=object_id,
|
||||
name=name,
|
||||
image=SimpleNamespace(field=SimpleNamespace(storage=storage, max_length=max_length)),
|
||||
)
|
||||
|
||||
def _second_segment(self, path: str):
|
||||
"""
|
||||
Extracts and returns the portion of the input string after the
|
||||
@@ -53,7 +76,7 @@ class ImageUploadTests(TestCase):
|
||||
Tests handling of a Windows file path with a fake directory and extension.
|
||||
"""
|
||||
inst = self._stub_instance(name=None)
|
||||
path = image_upload(inst, r'C:\fake_path\MyPhoto.JPG')
|
||||
path = _build_image_attachment_path(inst, r'C:\fake_path\MyPhoto.JPG')
|
||||
# Base directory and single-level path
|
||||
seg2 = self._second_segment(path)
|
||||
self.assertTrue(path.startswith('image-attachments/rack_12_'))
|
||||
@@ -67,7 +90,7 @@ class ImageUploadTests(TestCase):
|
||||
create subdirectories.
|
||||
"""
|
||||
inst = self._stub_instance(name='5/31/23')
|
||||
path = image_upload(inst, 'image.png')
|
||||
path = _build_image_attachment_path(inst, 'image.png')
|
||||
seg2 = self._second_segment(path)
|
||||
self.assertTrue(seg2.startswith('rack_12_'))
|
||||
self.assertNotIn('/', seg2)
|
||||
@@ -80,7 +103,7 @@ class ImageUploadTests(TestCase):
|
||||
into a single directory name without creating subdirectories.
|
||||
"""
|
||||
inst = self._stub_instance(name=r'5\31\23')
|
||||
path = image_upload(inst, 'image_name.png')
|
||||
path = _build_image_attachment_path(inst, 'image_name.png')
|
||||
|
||||
seg2 = self._second_segment(path)
|
||||
self.assertTrue(seg2.startswith('rack_12_'))
|
||||
@@ -93,7 +116,7 @@ class ImageUploadTests(TestCase):
|
||||
Tests the output path format generated by the `image_upload` function.
|
||||
"""
|
||||
inst = self._stub_instance(object_id=99, name='label')
|
||||
path = image_upload(inst, 'a.webp')
|
||||
path = _build_image_attachment_path(inst, 'a.webp')
|
||||
# The second segment must begin with "rack_99_"
|
||||
seg2 = self._second_segment(path)
|
||||
self.assertTrue(seg2.startswith('rack_99_'))
|
||||
@@ -105,7 +128,7 @@ class ImageUploadTests(TestCase):
|
||||
is omitted.
|
||||
"""
|
||||
inst = self._stub_instance(name='test')
|
||||
path = image_upload(inst, 'document.txt')
|
||||
path = _build_image_attachment_path(inst, 'document.txt')
|
||||
|
||||
seg2 = self._second_segment(path)
|
||||
self.assertTrue(seg2.startswith('rack_12_test'))
|
||||
@@ -121,7 +144,7 @@ class ImageUploadTests(TestCase):
|
||||
# Suppose the instance name has surrounding whitespace and
|
||||
# extra slashes.
|
||||
inst = self._stub_instance(name=' my/complex\\name ')
|
||||
path = image_upload(inst, 'irrelevant.png')
|
||||
path = _build_image_attachment_path(inst, 'irrelevant.png')
|
||||
|
||||
# The output should be flattened and sanitized.
|
||||
# We expect the name to be transformed into a valid filename without
|
||||
@@ -141,7 +164,7 @@ class ImageUploadTests(TestCase):
|
||||
for name in ['2025/09/12', r'2025\09\12']:
|
||||
with self.subTest(name=name):
|
||||
inst = self._stub_instance(name=name)
|
||||
path = image_upload(inst, 'x.jpeg')
|
||||
path = _build_image_attachment_path(inst, 'x.jpeg')
|
||||
seg2 = self._second_segment(path)
|
||||
self.assertTrue(seg2.startswith('rack_12_'))
|
||||
self.assertNotIn('/', seg2)
|
||||
@@ -154,7 +177,49 @@ class ImageUploadTests(TestCase):
|
||||
SuspiciousFileOperation, the fallback default is used.
|
||||
"""
|
||||
inst = self._stub_instance(name=' ')
|
||||
path = image_upload(inst, 'sample.png')
|
||||
path = _build_image_attachment_path(inst, 'sample.png')
|
||||
# Expect the fallback name 'unnamed' to be used.
|
||||
self.assertIn('unnamed', path)
|
||||
self.assertTrue(path.startswith('image-attachments/rack_12_'))
|
||||
|
||||
def test_image_upload_preserves_original_name_when_available(self):
|
||||
inst = self._bound_instance(
|
||||
storage=OverwriteStyleStorage(),
|
||||
name='action-buttons',
|
||||
)
|
||||
|
||||
path = image_upload(inst, 'action-buttons.png')
|
||||
|
||||
self.assertEqual(path, 'image-attachments/rack_12_action-buttons.png')
|
||||
|
||||
def test_image_upload_uses_base_collision_handling_with_overwrite_style_storage(self):
|
||||
inst = self._bound_instance(
|
||||
storage=OverwriteStyleStorage(existing_names={'image-attachments/rack_12_action-buttons.png'}),
|
||||
name='action-buttons',
|
||||
)
|
||||
|
||||
path = image_upload(inst, 'action-buttons.png')
|
||||
|
||||
self.assertEqual(
|
||||
path,
|
||||
'image-attachments/rack_12_action-buttons_sdmmer4.png',
|
||||
)
|
||||
|
||||
def test_image_field_generate_filename_uses_image_upload_collision_handling(self):
|
||||
field = ImageAttachment._meta.get_field('image')
|
||||
instance = ImageAttachment(
|
||||
object_type=self.ct_rack,
|
||||
object_id=12,
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
field,
|
||||
'storage',
|
||||
OverwriteStyleStorage(existing_names={'image-attachments/rack_12_action-buttons.png'}),
|
||||
):
|
||||
path = field.generate_filename(instance, 'action-buttons.png')
|
||||
|
||||
self.assertEqual(
|
||||
path,
|
||||
'image-attachments/rack_12_action-buttons_sdmmer4.png',
|
||||
)
|
||||
|
||||
@@ -2,7 +2,7 @@ import importlib
|
||||
from pathlib import Path
|
||||
|
||||
from django.core.exceptions import ImproperlyConfigured, SuspiciousFileOperation
|
||||
from django.core.files.storage import default_storage
|
||||
from django.core.files.storage import Storage, default_storage
|
||||
from django.core.files.utils import validate_file_name
|
||||
from django.db import models
|
||||
from django.db.models import Q
|
||||
@@ -67,15 +67,13 @@ def is_taggable(obj):
|
||||
return False
|
||||
|
||||
|
||||
def image_upload(instance, filename):
|
||||
def _build_image_attachment_path(instance, filename, *, storage=default_storage):
|
||||
"""
|
||||
Return a path for uploading image attachments.
|
||||
Build a deterministic relative path for an image attachment.
|
||||
|
||||
- Normalizes browser paths (e.g., C:\\fake_path\\photo.jpg)
|
||||
- Uses the instance.name if provided (sanitized to a *basename*, no ext)
|
||||
- Prefixes with a machine-friendly identifier
|
||||
|
||||
Note: Relies on Django's default_storage utility.
|
||||
"""
|
||||
upload_dir = 'image-attachments'
|
||||
default_filename = 'unnamed'
|
||||
@@ -92,22 +90,38 @@ def image_upload(instance, filename):
|
||||
# Rely on Django's get_valid_filename to perform sanitization.
|
||||
stem = (instance.name or file_path.stem).strip()
|
||||
try:
|
||||
safe_stem = default_storage.get_valid_name(stem)
|
||||
safe_stem = storage.get_valid_name(stem)
|
||||
except SuspiciousFileOperation:
|
||||
safe_stem = default_filename
|
||||
|
||||
# Append the uploaded extension only if it's an allowed image type
|
||||
final_name = f"{safe_stem}.{ext}" if ext in allowed_img_extensions else safe_stem
|
||||
final_name = f'{safe_stem}.{ext}' if ext in allowed_img_extensions else safe_stem
|
||||
|
||||
# Create a machine-friendly prefix from the instance
|
||||
prefix = f"{instance.object_type.model}_{instance.object_id}"
|
||||
name_with_path = f"{upload_dir}/{prefix}_{final_name}"
|
||||
prefix = f'{instance.object_type.model}_{instance.object_id}'
|
||||
name_with_path = f'{upload_dir}/{prefix}_{final_name}'
|
||||
|
||||
# Validate the generated relative path (blocks absolute/traversal)
|
||||
validate_file_name(name_with_path, allow_relative_path=True)
|
||||
return name_with_path
|
||||
|
||||
|
||||
def image_upload(instance, filename):
|
||||
"""
|
||||
Return a relative upload path for an image attachment, applying Django's
|
||||
usual suffix-on-collision behavior regardless of storage backend.
|
||||
"""
|
||||
field = instance.image.field
|
||||
name_with_path = _build_image_attachment_path(instance, filename, storage=field.storage)
|
||||
|
||||
# Intentionally call Django's base Storage implementation here. Some
|
||||
# backends override get_available_name() to reuse the incoming name
|
||||
# unchanged, but we want Django's normal suffix-on-collision behavior
|
||||
# while still dispatching exists() / get_alternative_name() to the
|
||||
# configured storage instance.
|
||||
return Storage.get_available_name(field.storage, name_with_path, max_length=field.max_length)
|
||||
|
||||
|
||||
def is_script(obj):
|
||||
"""
|
||||
Returns True if the object is a Script or Report.
|
||||
|
||||
Reference in New Issue
Block a user