mirror of
https://github.com/netbox-community/netbox.git
synced 2026-03-31 22:53:21 +02:00
cleanup
This commit is contained in:
@@ -101,7 +101,10 @@ class ScriptModuleSerializer(ValidatedModelSerializer):
|
|||||||
try:
|
try:
|
||||||
return super().create(validated_data)
|
return super().create(validated_data)
|
||||||
except IntegrityError:
|
except IntegrityError:
|
||||||
# Clean up the file written to disk before the failed DB insert
|
# ManagedFile has a single unique constraint: (file_root, file_path), so an
|
||||||
|
# IntegrityError here always means a duplicate file name regardless of which
|
||||||
|
# path (upload or data_file sync) set validated_data['file_path'].
|
||||||
|
# Clean up the file written to disk before the failed DB insert.
|
||||||
if file_path := validated_data.get('file_path'):
|
if file_path := validated_data.get('file_path'):
|
||||||
storage = storages.create_storage(storages.backends["scripts"])
|
storage = storages.create_storage(storages.backends["scripts"])
|
||||||
storage.delete(file_path)
|
storage.delete(file_path)
|
||||||
|
|||||||
@@ -315,9 +315,9 @@ class ScriptViewSet(ModelViewSet):
|
|||||||
raise MethodNotAllowed(request.method)
|
raise MethodNotAllowed(request.method)
|
||||||
|
|
||||||
def destroy(self, request, *args, **kwargs):
|
def destroy(self, request, *args, **kwargs):
|
||||||
if not request.user.has_perm('extras.delete_scriptmodule'):
|
|
||||||
raise PermissionDenied(_("This user does not have permission to delete script modules."))
|
|
||||||
script = self._get_script(kwargs[self.lookup_field])
|
script = self._get_script(kwargs[self.lookup_field])
|
||||||
|
if not request.user.has_perm('extras.delete_scriptmodule', script.module):
|
||||||
|
raise PermissionDenied(_("This user does not have permission to delete script modules."))
|
||||||
script.module.delete()
|
script.module.delete()
|
||||||
return Response(status=status.HTTP_204_NO_CONTENT)
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
||||||
|
|
||||||
|
|||||||
@@ -1393,6 +1393,15 @@ class ScriptUploadTest(APITestCase):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def setUpTestData(cls):
|
def setUpTestData(cls):
|
||||||
cls.data_source = DataSource.objects.create(name='Test Source', type='local', source_url='/tmp')
|
cls.data_source = DataSource.objects.create(name='Test Source', type='local', source_url='/tmp')
|
||||||
|
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||||
|
cls.data_file = DataFile.objects.create(
|
||||||
|
source=cls.data_source,
|
||||||
|
path='test_datasource.py',
|
||||||
|
last_updated=now(),
|
||||||
|
size=len(script_content),
|
||||||
|
hash=hashlib.sha256(script_content).hexdigest(),
|
||||||
|
data=script_content,
|
||||||
|
)
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
@@ -1458,3 +1467,48 @@ class ScriptUploadTest(APITestCase):
|
|||||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||||
response = self.client.post(self.url_list, {}, format='json', **self.header)
|
response = self.client.post(self.url_list, {}, format='json', **self.header)
|
||||||
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
||||||
|
|
||||||
|
def test_create_script_module_from_data_file(self):
|
||||||
|
"""POST with data_source + data_file (JSON) creates a ScriptModule with the correct file_path."""
|
||||||
|
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||||
|
mock_storage = MagicMock()
|
||||||
|
mock_storage.open.return_value.__enter__ = MagicMock(return_value=MagicMock())
|
||||||
|
mock_storage.open.return_value.__exit__ = MagicMock(return_value=False)
|
||||||
|
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
|
||||||
|
mock_storages.create_storage.return_value = mock_storage
|
||||||
|
mock_storages.backends = {'scripts': {}}
|
||||||
|
response = self.client.post(
|
||||||
|
self.url_list,
|
||||||
|
{'data_source': self.data_source.pk, 'data_file': self.data_file.pk},
|
||||||
|
format='json',
|
||||||
|
**self.header,
|
||||||
|
)
|
||||||
|
self.assertHttpStatus(response, status.HTTP_201_CREATED)
|
||||||
|
self.assertEqual(response.data['file_path'], 'test_datasource.py')
|
||||||
|
self.assertTrue(ScriptModule.objects.filter(file_path='test_datasource.py').exists())
|
||||||
|
|
||||||
|
def test_destroy_script_module(self):
|
||||||
|
"""DELETE removes the ScriptModule and returns 204."""
|
||||||
|
self.add_permissions('extras.delete_scriptmodule', 'extras.view_script')
|
||||||
|
from extras.models import Script
|
||||||
|
module = ScriptModule.objects.create(
|
||||||
|
file_root='scripts', file_path='to_delete.py',
|
||||||
|
)
|
||||||
|
script = Script.objects.create(module=module, name='ToDelete', is_executable=True)
|
||||||
|
url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
|
||||||
|
response = self.client.delete(url, **self.header)
|
||||||
|
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
|
||||||
|
self.assertFalse(ScriptModule.objects.filter(pk=module.pk).exists())
|
||||||
|
|
||||||
|
def test_destroy_script_module_without_permission(self):
|
||||||
|
"""DELETE without delete_scriptmodule permission returns 403."""
|
||||||
|
self.add_permissions('extras.view_script')
|
||||||
|
from extras.models import Script
|
||||||
|
module = ScriptModule.objects.create(
|
||||||
|
file_root='scripts', file_path='no_delete.py',
|
||||||
|
)
|
||||||
|
script = Script.objects.create(module=module, name='NoDelete', is_executable=True)
|
||||||
|
url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
|
||||||
|
response = self.client.delete(url, **self.header)
|
||||||
|
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
|
||||||
|
self.assertTrue(ScriptModule.objects.filter(pk=module.pk).exists())
|
||||||
|
|||||||
Reference in New Issue
Block a user