Compare commits

..

17 Commits

Author SHA1 Message Date
Jason Novinger
cf5bf9a4d0 Fix import order 2025-10-26 23:21:28 -05:00
Jason Novinger
d89948b3ab Remove extraneous TS comments 2025-10-26 23:12:55 -05:00
Jason Novinger
af8f460288 Fixes #7604: Add filter modifier dropdowns for advanced lookup operators
Implements dynamic filter modifier UI that allows users to select lookup operators
(exact, contains, starts with, regex, negation, empty/not empty) directly in filter
forms without manual URL parameter editing.

Supports filters for all scalar types and strings, as well as some
related object filters. Explicitly does not support filters on fields
that use APIWidget. That has been broken out in to follow up work.

**Backend:**
- FilterModifierWidget: Wraps form widgets with lookup modifier dropdown
- FilterModifierMixin: Auto-enhances filterset fields with appropriate lookups
- Extended lookup support: Adds negation (n), regex, iregex, empty_true/false lookups
- Field-type-aware: CharField gets text lookups, IntegerField gets comparison operators, etc.

**Frontend:**
- TypeScript handler syncs modifier dropdown with URL parameters
- Dynamically updates form field names (serial → serial__ic) on modifier change
- Flexible-width modifier dropdowns with semantic CSS classes
2025-10-26 22:49:59 -05:00
Martin Hauser
c5124cb2e4 feat(templates): Update user menu icon class names for consistency
Switch icons in the top-right User dropdown to Tabler’s
`dropdown-item-icon` to standardize spacing between the icon and label.
Improves readability and ensures alignment with the overall UI styling.

Fixes #20608
2025-10-21 08:35:50 -04:00
Jason Novinger
d01d7b4156 Fixes #20551: Support quick-add form prefix in automatic slug generation (#20624)
* Fixes #20551: Support quick-add form prefix in automatic slug generation

The slug generation logic in `reslug.ts` looks for form fields using hard-coded ID selectors like `#id_slug` and `#id_name`. In quick-add modals, Django applies a `quickadd` prefix to form fields (introduced in #20542), resulting in IDs like `#id_quickadd-slug` and `#id_quickadd-name`. The logic couldn't find these prefixed fields, so automatic slug generation failed silently in quick-add modals. This fix updates the field selectors to try both unprefixed and prefixed patterns using the nullish coalescing operator (`??`), checking for the standard field ID first and falling back to the quickadd-prefixed ID if the standard one isn't found.

* Address PR feedback

The slug generation logic required updates to support form prefixes like `quickadd`. Python-side changes
ensure `SlugField.get_bound_field()` updates the `slug-source` attribute to include the form prefix when
present, so JavaScript receives the correct prefixed field ID. `SlugWidget.__init__()` now adds a
`slug-field` class to enable selector-based field discovery. On the frontend, `reslug.ts` now uses class
selectors (`button.reslug` and `input.slug-field`) instead of ID-based lookups, eliminating the need for
fallback logic. The template was updated to use `class="reslug"` instead of `id="reslug"` on the button to
avoid ID duplication issues.
2025-10-21 08:33:10 -04:00
github-actions
4db6123fb2 Update source translation strings 2025-10-21 05:03:30 +00:00
Jeremy Stretch
43648d629b Fixes #20606: Enable copying text from badges in UI (#20633) 2025-10-20 17:12:42 -05:00
bctiemann
0b97df0984 Merge pull request #20625 from netbox-community/20498-url-custom-field-validation-regex
Fixes #20498: Apply validation regex to URL custom fields
2025-10-20 15:30:33 -04:00
Martin Hauser
5334c8143c feat(forms): Add context handling for ModuleBay field (#20586) 2025-10-20 10:16:53 -07:00
Martin Hauser
bbb330becf feat(filtersets): Add assigned and primary filters for MACAddress (#20620)
Introduce Boolean filters `assigned` and `primary` to the MACAddress
filterset, improving filtering capabilities. Update forms, tables, and
GraphQL queries to incorporate the new filters. Add tests to validate
the correct functionality.

Fixes #20399
2025-10-20 10:01:25 -07:00
Jeremy Stretch
e4c74ce6a3 Closes #20614: Update ruff for pre-commit check (#20631) 2025-10-20 09:07:12 -07:00
Martin Hauser
a4868f894d feat(ipam): Add ContactsColumnMixin to ServiceTable
Enhance `ServiceTable` by incorporating `ContactsColumnMixin` for better
contact management. Updates the fields to include `contacts`.

Fixes #20567
2025-10-20 09:07:25 -04:00
github-actions
531ea34207 Update source translation strings 2025-10-20 05:03:22 +00:00
Jason Novinger
6747c82a1a Fixes #20498: Apply validation regex to URL custom fields
The validation_regex field was not being enforced for URL type custom
fields. This fix adds regex validation in two places:

1. to_form_field() - Applies regex validator to form fields (UI validation)
2. validate() - Applies regex check in model validation (API/programmatic)

Note: The original issue reported UI validation only, but this fix also
adds API validation for consistency with text field behavior and to
ensure data integrity across all entry points.
2025-10-19 18:30:54 -05:00
Martin Hauser
e251ea10b5 Closes #20605: Document variable prefilling via URL parameters (#20619) 2025-10-19 15:42:09 -05:00
Martin Hauser
a1aaf465ac Fixes #20466: Correct handling of assigned filter logic (#20538) 2025-10-19 12:51:44 -05:00
Martin Hauser
2a1d315d85 Fixes #20524: Enhance API script scheduling validation (#20616) 2025-10-19 12:29:14 -05:00
166 changed files with 3201 additions and 3090 deletions

View File

@@ -35,9 +35,9 @@ body:
label: Python Version
description: What version of Python are you currently running?
options:
- "3.10"
- "3.11"
- "3.12"
- "3.13"
- "3.14"
validations:
required: true
- type: textarea

View File

@@ -31,7 +31,7 @@ jobs:
NETBOX_CONFIGURATION: netbox.configuration_testing
strategy:
matrix:
python-version: ['3.12', '3.13']
python-version: ['3.10', '3.11', '3.12']
node-version: ['20.x']
services:
redis:

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.9
rev: v0.14.1
hooks:
- id: ruff
name: "Ruff linter"

File diff suppressed because it is too large Load Diff

View File

@@ -2,7 +2,7 @@
## Local Authentication
Local user accounts and groups can be created in NetBox under the "Authentication" section in the "Admin" menu.
Local user accounts and groups can be created in NetBox under the "Authentication" section in the "Admin" menu. This section is available only to users with the "staff" permission enabled.
At a minimum, each user account must have a username and password set. User accounts may also denote a first name, last name, and email address. [Permissions](../permissions.md) may also be assigned to individual users and/or groups as needed.

View File

@@ -1,15 +1,5 @@
# GraphQL API Parameters
## GRAPHQL_DEFAULT_VERSION
!!! note "This parameter was introduced in NetBox v4.5."
Default: `1`
Designates the default version of the GraphQL API served by `/graphql/`. To access a specific version, append the version number to the URL, e.g. `/graphql/v2/`.
---
## GRAPHQL_ENABLED
!!! tip "Dynamic Configuration Parameter"

View File

@@ -127,3 +127,19 @@ The list of groups that promote an remote User to Superuser on Login. If group i
Default: `[]` (Empty list)
The list of users that get promoted to Superuser on Login. If user isn't present in list on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
---
## REMOTE_AUTH_STAFF_GROUPS
Default: `[]` (Empty list)
The list of groups that promote an remote User to Staff on Login. If group isn't present on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
---
## REMOTE_AUTH_STAFF_USERS
Default: `[]` (Empty list)
The list of users that get promoted to Staff on Login. If user isn't present in list on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )

View File

@@ -23,31 +23,6 @@ ALLOWED_HOSTS = ['*']
---
## API_TOKEN_PEPPERS
!!! info "This parameter was introduced in NetBox v4.5."
[Cryptographic peppers](https://en.wikipedia.org/wiki/Pepper_(cryptography)) are employed to generate hashes of sensitive values on the server. This parameter defines the peppers used to hash v2 API tokens in NetBox. You must define at least one pepper before creating a v2 API token. See the [API documentation](../integrations/rest-api.md#authentication) for further information about how peppers are used.
```python
API_TOKEN_PEPPERS = {
# DO NOT USE THIS EXAMPLE PEPPER IN PRODUCTION
1: 'kp7ht*76fiQAhUi5dHfASLlYUE_S^gI^(7J^K5M!LfoH@vl&b_',
}
```
!!! warning "Peppers are sensitive"
Treat pepper values as extremely sensitive. Consider populating peppers from environment variables at initialization time rather than defining them in the configuration file, if feasible.
Peppers must be at least 50 characters in length and should comprise a random string with a diverse character set. Consider using the Python script at `$INSTALL_ROOT/netbox/generate_secret_key.py` to generate a pepper value.
It is recommended to start with a pepper ID of `1`. Additional peppers can be introduced later as needed to begin rotating token hashes.
!!! tip
Although NetBox will run without `API_TOKEN_PEPPERS` defined, the use of v2 API tokens will be unavailable.
---
## DATABASE
!!! warning "Legacy Configuration Parameter"

View File

@@ -1,5 +1,16 @@
# Security & Authentication Parameters
## ALLOW_TOKEN_RETRIEVAL
Default: `False`
!!! note
The default value of this parameter changed from `True` to `False` in NetBox v4.3.0.
If disabled, the values of API tokens will not be displayed after each token's initial creation. A user **must** record the value of a token prior to its creation, or it will be lost. Note that this affects _all_ users, regardless of assigned permissions.
---
## ALLOWED_URL_SCHEMES
!!! tip "Dynamic Configuration Parameter"

View File

@@ -131,6 +131,17 @@ self.log_info(f"Running as user {username} (IP: {ip_address})...")
For a complete list of available request parameters, please see the [Django documentation](https://docs.djangoproject.com/en/stable/ref/request-response/).
## Reading Data from Files
The Script class provides two convenience methods for reading data from files:
* `load_yaml`
* `load_json`
These two methods will load data in YAML or JSON format, respectively, from files within the local path (i.e. `SCRIPTS_ROOT`).
**Note:** These convenience methods are deprecated and will be removed in NetBox v4.4. These only work if running scripts within the local path, they will not work if using a storage other than ScriptFileSystemStorage.
## Logging
The Script object provides a set of convenient functions for recording messages at different severity levels:
@@ -393,6 +404,61 @@ A complete date & time. Returns a `datetime.datetime` object.
Custom scripts can be run via the web UI by navigating to the script, completing any required form data, and clicking the "run script" button. It is possible to schedule a script to be executed at specified time in the future. A scheduled script can be canceled by deleting the associated job result object.
#### Prefilling variables via URL parameters
Script form fields can be prefilled by appending query parameters to the script URL. Each parameter name must match the variable name defined on the script class. Prefilled values are treated as initial values and can be edited before execution. Multiple values can be supplied by repeating the same parameter. Query values must be percentencoded where required (for example, spaces as `%20`).
Examples:
For string and integer variables, when a script defines:
```python
from extras.scripts import Script, StringVar, IntegerVar
class MyScript(Script):
name = StringVar()
count = IntegerVar()
```
the following URL prefills the `name` and `count` fields:
```
https://<netbox>/extras/scripts/<script_id>/?name=Branch42&count=3
```
For object variables (`ObjectVar`), supply the objects primary key (PK):
```
https://<netbox>/extras/scripts/<script_id>/?device=1
```
If an object ID cannot be resolved or the object is not visible to the requesting user, the field remains unpopulated.
Supported variable types:
| Variable class | Expected input | Example query string |
|--------------------------|---------------------------------|---------------------------------------------|
| `StringVar` | string (percentencoded) | `?name=Branch42` |
| `TextVar` | string (percentencoded) | `?notes=Initial%20value` |
| `IntegerVar` | integer | `?count=3` |
| `DecimalVar` | decimal number | `?ratio=0.75` |
| `BooleanVar` | value → `True`; empty → `False` | `?enabled=true` (True), `?enabled=` (False) |
| `ChoiceVar` | choice value (not label) | `?role=edge` |
| `MultiChoiceVar` | choice values (repeat) | `?roles=edge&roles=core` |
| `ObjectVar(Device)` | PK (integer) | `?device=1` |
| `MultiObjectVar(Device)` | PKs (repeat) | `?devices=1&devices=2` |
| `IPAddressVar` | IP address | `?ip=198.51.100.10` |
| `IPAddressWithMaskVar` | IP address with mask | `?addr=192.0.2.1/24` |
| `IPNetworkVar` | IP network prefix | `?network=2001:db8::/64` |
| `DateVar` | date `YYYY-MM-DD` | `?date=2025-01-05` |
| `DateTimeVar` | ISO datetime | `?when=2025-01-05T14:30:00` |
| `FileVar` | — (not supported) | — |
!!! note
- The parameter names above are examples; use the actual variable attribute names defined by the script.
- For `BooleanVar`, only an empty value (`?enabled=`) unchecks the box; any other value including `false` or `0` checks it.
- File uploads (`FileVar`) cannot be prefilled via URL parameters.
### Via the API
To run a script via the REST API, issue a POST request to the script's endpoint specifying the form data and commitment. For example, to run a script named `example.MyReport`, we would make a request such as the following:

View File

@@ -7,7 +7,7 @@ Getting started with NetBox development is pretty straightforward, and should fe
* A Linux system or compatible environment
* A PostgreSQL server, which can be installed locally [per the documentation](../installation/1-postgresql.md)
* A Redis server, which can also be [installed locally](../installation/2-redis.md)
* Python 3.12 or later
* Python 3.10 or later
### 1. Fork the Repo

View File

@@ -8,7 +8,7 @@ NetBox's REST API, powered by the [Django REST Framework](https://www.django-res
```no-highlight
curl -s -X POST \
-H "Authorization: Bearer $TOKEN" \
-H "Authorization: Token $TOKEN" \
-H "Content-Type: application/json" \
http://netbox/api/ipam/prefixes/ \
--data '{"prefix": "192.0.2.0/24", "site": {"name": "Branch 12"}}'

View File

@@ -90,10 +90,3 @@ http://netbox:8000/api/extras/config-templates/123/render/ \
"bar": 123
}'
```
!!! note "Permissions"
Rendering configuration templates via the REST API requires appropriate permissions for the relevant object type:
* To render a device's configuration via `/api/dcim/devices/{id}/render-config/`, assign a permission for "DCIM > Device" with the `render_config` action.
* To render a virtual machine's configuration via `/api/virtualization/virtual-machines/{id}/render-config/`, assign a permission for "Virtualization > Virtual Machine" with the `render_config` action.
* To render a config template directly via `/api/extras/config-templates/{id}/render/`, assign a permission for "Extras > Config Template" with the `render` action.

View File

@@ -34,6 +34,9 @@ Sets the default number of rows displayed on paginated tables.
### Paginator placement
Controls where pagination controls are rendered relative to a table.
### HTMX navigation (experimental)
Enables partialpage navigation for supported views. Disable this preference if unexpected behavior is observed.
### Striped table rows
Toggles alternating row backgrounds on tables.

View File

@@ -6,8 +6,8 @@ This section of the documentation discusses installing and configuring the NetBo
Begin by installing all system packages required by NetBox and its dependencies.
!!! warning "Python 3.12 or later required"
NetBox supports only Python 3.12 or later.
!!! warning "Python 3.10 or later required"
NetBox supports Python 3.10, 3.11, and 3.12.
```no-highlight
sudo apt install -y python3 python3-pip python3-venv python3-dev \
@@ -15,7 +15,7 @@ build-essential libxml2-dev libxslt1-dev libffi-dev libpq-dev \
libssl-dev zlib1g-dev
```
Before continuing, check that your installed Python version is at least 3.12:
Before continuing, check that your installed Python version is at least 3.10:
```no-highlight
python3 -V
@@ -120,23 +120,6 @@ If you are not yet sure what the domain name and/or IP address of the NetBox ins
ALLOWED_HOSTS = ['*']
```
### API_TOKEN_PEPPERS
Define at least one random cryptographic pepper, identified by a numeric ID starting at 1. This will be used to generate SHA256 checksums for API tokens.
```python
API_TOKEN_PEPPERS = {
# DO NOT USE THIS EXAMPLE PEPPER IN PRODUCTION
1: 'kp7ht*76fiQAhUi5dHfASLlYUE_S^gI^(7J^K5M!LfoH@vl&b_',
}
```
!!! tip
As with [`SECRET_KEY`](#secret_key) below, you can use the `generate_secret_key.py` script to generate a random pepper:
```no-highlight
python3 ../generate_secret_key.py
```
### DATABASES
This parameter holds the PostgreSQL database configuration details. The default database must be defined; additional databases may be defined as needed e.g. by plugins.
@@ -252,10 +235,10 @@ Once NetBox has been configured, we're ready to proceed with the actual installa
sudo /opt/netbox/upgrade.sh
```
Note that **Python 3.12 or later is required** for NetBox v4.5 and later releases. If the default Python installation on your server is set to a lesser version, pass the path to the supported installation as an environment variable named `PYTHON`. (Note that the environment variable must be passed _after_ the `sudo` command.)
Note that **Python 3.10 or later is required** for NetBox v4.0 and later releases. If the default Python installation on your server is set to a lesser version, pass the path to the supported installation as an environment variable named `PYTHON`. (Note that the environment variable must be passed _after_ the `sudo` command.)
```no-highlight
sudo PYTHON=/usr/bin/python3.12 /opt/netbox/upgrade.sh
sudo PYTHON=/usr/bin/python3.10 /opt/netbox/upgrade.sh
```
!!! note

View File

@@ -60,3 +60,6 @@ You should see output similar to the following:
If the NetBox service fails to start, issue the command `journalctl -eu netbox` to check for log messages that may indicate the problem.
Once you've verified that the WSGI workers are up and running, move on to HTTP server setup.
!!! note
There is a bug in the current stable release of gunicorn (v21.2.0) where automatic restarts of the worker processes can result in 502 errors under heavy load. (See [gunicorn bug #3038](https://github.com/benoitc/gunicorn/issues/3038) for more detail.) Users who encounter this issue may opt to downgrade to an earlier, unaffected release of gunicorn (`pip install gunicorn==20.1.0`). Note, however, that this earlier release does not officially support Python 3.11.

View File

@@ -121,6 +121,7 @@ AUTH_LDAP_MIRROR_GROUPS = True
# Define special user types using groups. Exercise great caution when assigning superuser status.
AUTH_LDAP_USER_FLAGS_BY_GROUP = {
"is_active": "cn=active,ou=groups,dc=example,dc=com",
"is_staff": "cn=staff,ou=groups,dc=example,dc=com",
"is_superuser": "cn=superuser,ou=groups,dc=example,dc=com"
}
@@ -133,6 +134,7 @@ AUTH_LDAP_CACHE_TIMEOUT = 3600
```
* `is_active` - All users must be mapped to at least this group to enable authentication. Without this, users cannot log in.
* `is_staff` - Users mapped to this group are enabled for access to the administration tools; this is the equivalent of checking the "staff status" box on a manually created user. This doesn't grant any specific permissions.
* `is_superuser` - Users mapped to this group will be granted superuser status. Superusers are implicitly granted all permissions.
!!! warning
@@ -246,6 +248,7 @@ AUTH_LDAP_MIRROR_GROUPS = True
# Define special user types using groups. Exercise great caution when assigning superuser status.
AUTH_LDAP_USER_FLAGS_BY_GROUP = {
"is_active": "cn=active,ou=groups,dc=example,dc=com",
"is_staff": "cn=staff,ou=groups,dc=example,dc=com",
"is_superuser": "cn=superuser,ou=groups,dc=example,dc=com"
}

View File

@@ -27,7 +27,7 @@ The following sections detail how to set up a new instance of NetBox:
| Dependency | Supported Versions |
|------------|--------------------|
| Python | 3.12, 3.13, 3.14 |
| Python | 3.10, 3.11, 3.12 |
| PostgreSQL | 14+ |
| Redis | 4.0+ |

View File

@@ -19,7 +19,7 @@ NetBox requires the following dependencies:
| Dependency | Supported Versions |
|------------|--------------------|
| Python | 3.12, 3.13, 3.14 |
| Python | 3.10, 3.11, 3.12 |
| PostgreSQL | 14+ |
| Redis | 4.0+ |
@@ -27,7 +27,6 @@ NetBox requires the following dependencies:
| NetBox Version | Python min | Python max | PostgreSQL min | Redis min | Documentation |
|:--------------:|:----------:|:----------:|:--------------:|:---------:|:-----------------------------------------------------------------------------------------:|
| 4.5 | 3.12 | 3.14 | 14 | 4.0 | [Link](https://github.com/netbox-community/netbox/blob/v4.5.0/docs/installation/index.md) |
| 4.4 | 3.10 | 3.12 | 14 | 4.0 | [Link](https://github.com/netbox-community/netbox/blob/v4.4.0/docs/installation/index.md) |
| 4.3 | 3.10 | 3.12 | 14 | 4.0 | [Link](https://github.com/netbox-community/netbox/blob/v4.3.0/docs/installation/index.md) |
| 4.2 | 3.10 | 3.12 | 13 | 4.0 | [Link](https://github.com/netbox-community/netbox/blob/v4.2.0/docs/installation/index.md) |
@@ -131,7 +130,7 @@ sudo ./upgrade.sh
If the default version of Python is not at least 3.10, you'll need to pass the path to a supported Python version as an environment variable when calling the upgrade script. For example:
```no-highlight
sudo PYTHON=/usr/bin/python3.12 ./upgrade.sh
sudo PYTHON=/usr/bin/python3.10 ./upgrade.sh
```
!!! note

View File

@@ -80,7 +80,7 @@ Likewise, the site, rack, and device objects are located under the "DCIM" applic
The full hierarchy of available endpoints can be viewed by navigating to the API root in a web browser.
Each model generally has two views associated with it: a list view and a detail view. The list view is used to retrieve a list of multiple objects and to create new objects. The detail view is used to retrieve, update, or delete a single existing object. All objects are referenced by their numeric primary key (`id`).
Each model generally has two views associated with it: a list view and a detail view. The list view is used to retrieve a list of multiple objects and to create new objects. The detail view is used to retrieve, update, or delete an single existing object. All objects are referenced by their numeric primary key (`id`).
* `/api/dcim/devices/` - List existing devices or create a new device
* `/api/dcim/devices/123/` - Retrieve, update, or delete the device with ID 123
@@ -653,22 +653,18 @@ The NetBox REST API primarily employs token-based authentication. For convenienc
### Tokens
A token is a secret, unique identifier mapped to a NetBox user account. Each user may have one or more tokens which he or she can use for authentication when making REST API requests. To create a token, navigate to the API tokens page under your user profile. When creating a token, NetBox will automatically populate a randomly-generated token value.
!!! note "Tokens cannot be retrieved once created"
Once a token has been created, its plaintext value cannot be retrieved. For this reason, you must take care to securely record the token locally immediately upon its creation. If a token plaintext is lost, it cannot be recovered: A new token must be created.
A token is a unique identifier mapped to a NetBox user account. Each user may have one or more tokens which he or she can use for authentication when making REST API requests. To create a token, navigate to the API tokens page under your user profile.
By default, all users can create and manage their own REST API tokens under the user control panel in the UI or via the REST API. This ability can be disabled by overriding the [`DEFAULT_PERMISSIONS`](../configuration/security.md#default_permissions) configuration parameter.
Each token contains a 160-bit key represented as 40 hexadecimal characters. When creating a token, you'll typically leave the key field blank so that a random key will be automatically generated. However, NetBox allows you to specify a key in case you need to restore a previously deleted token to operation.
Additionally, a token can be set to expire at a specific time. This can be useful if an external client needs to be granted temporary access to NetBox.
#### v1 and v2 Tokens
!!! info "Restricting Token Retrieval"
The ability to retrieve the key value of a previously-created API token can be restricted by disabling the [`ALLOW_TOKEN_RETRIEVAL`](../configuration/security.md#allow_token_retrieval) configuration parameter.
Beginning with NetBox v4.5, two versions of API token are supported, denoted as v1 and v2. Users are strongly encouraged to create only v2 tokens and to discontinue the use of v1 tokens. Support for v1 tokens will be removed in a future NetBox release.
v2 API tokens offer much stronger security. The token plaintext given at creation time is hashed together with a configured [cryptographic pepper](../configuration/required-parameters.md#api_token_peppers) to generate a unique checksum. This checksum is irreversible; the token plaintext is never stored on the server and thus cannot be retrieved even with database-level access.
#### Restricting Write Operations
### Restricting Write Operations
By default, a token can be used to perform all actions via the API that a user would be permitted to do via the web UI. Deselecting the "write enabled" option will restrict API requests made with the token to read operations (e.g. GET) only.
@@ -685,22 +681,10 @@ It is possible to provision authentication tokens for other users via the REST A
### Authenticating to the API
An authentication token is included with a request in its `Authorization` header. The format of the header value depends on the version of token in use. v2 tokens use the following form, concatenating the token's prefix (`nbt_`) and key with its plaintext value, separated by a period:
An authentication token is attached to a request by setting the `Authorization` header to the string `Token` followed by a space and the user's token:
```
Authorization: Bearer nbt_<key>.<token>
```
Legacy v1 tokens use the prefix `Token` rather than `Bearer`, and include only the token plaintext. (v1 tokens do not have a key.)
```
Authorization: Token <token>
```
Below is an example REST API request utilizing a v2 token.
```
$ curl -H "Authorization: Bearer nbt_4F9DAouzURLb.zjebxBPzICiPbWz0Wtx0fTL7bCKXKGTYhNzkgC2S" \
$ curl -H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
https://netbox/api/dcim/sites/
{

View File

@@ -173,12 +173,12 @@ classifiers=[
'Intended Audience :: Developers',
'Natural Language :: English',
"Programming Language :: Python :: 3 :: Only",
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'Programming Language :: Python :: 3.14',
]
requires-python = ">=3.12.0"
requires-python = ">=3.10.0"
```
@@ -195,7 +195,7 @@ python3 -m venv ~/.virtualenvs/my_plugin
You can make NetBox available within this environment by creating a path file pointing to its location. This will add NetBox to the Python path upon activation. (Be sure to adjust the command below to specify your actual virtual environment path, Python version, and NetBox installation.)
```shell
echo /opt/netbox/netbox > $VENV/lib/python3.12/site-packages/netbox.pth
echo /opt/netbox/netbox > $VENV/lib/python3.10/site-packages/netbox.pth
```
## Development Installation

View File

@@ -64,17 +64,14 @@ item1 = PluginMenuItem(
A `PluginMenuItem` has the following attributes:
| Attribute | Required | Description |
|-----------------|----------|------------------------------------------------------|
| `link` | Yes | Name of the URL path to which this menu item links |
| `link_text` | Yes | The text presented to the user |
| `permissions` | - | A list of permissions required to display this link |
| `auth_required` | - | Display only for authenticated users |
| `staff_only` | - | Display only for superusers |
| `buttons` | - | An iterable of PluginMenuButton instances to include |
!!! note "Changed in NetBox v4.5"
In releases prior to NetBox v4.5, `staff_only` restricted display of a menu item to only users with `is_staff` set to True. In NetBox v4.5, the `is_staff` flag was removed from the user model. Menu items with `staff_only` set to True are now displayed only for superusers.
| Attribute | Required | Description |
|-----------------|----------|----------------------------------------------------------------------------------------------------------|
| `link` | Yes | Name of the URL path to which this menu item links |
| `link_text` | Yes | The text presented to the user |
| `permissions` | - | A list of permissions required to display this link |
| `auth_required` | - | Display only for authenticated users |
| `staff_only` | - | Display only for users who have `is_staff` set to true (any specified permissions will also be required) |
| `buttons` | - | An iterable of PluginMenuButton instances to include |
## Menu Buttons

57
netbox/account/tables.py Normal file
View File

@@ -0,0 +1,57 @@
from django.utils.translation import gettext as _
from account.models import UserToken
from netbox.tables import NetBoxTable, columns
__all__ = (
'UserTokenTable',
)
TOKEN = """<samp><span id="token_{{ record.pk }}">{{ record }}</span></samp>"""
ALLOWED_IPS = """{{ value|join:", " }}"""
COPY_BUTTON = """
{% if settings.ALLOW_TOKEN_RETRIEVAL %}
{% copy_content record.pk prefix="token_" color="success" %}
{% endif %}
"""
class UserTokenTable(NetBoxTable):
"""
Table for users to manager their own API tokens under account views.
"""
key = columns.TemplateColumn(
verbose_name=_('Key'),
template_code=TOKEN,
)
write_enabled = columns.BooleanColumn(
verbose_name=_('Write Enabled')
)
created = columns.DateTimeColumn(
timespec='minutes',
verbose_name=_('Created'),
)
expires = columns.DateTimeColumn(
timespec='minutes',
verbose_name=_('Expires'),
)
last_used = columns.DateTimeColumn(
verbose_name=_('Last Used'),
)
allowed_ips = columns.TemplateColumn(
verbose_name=_('Allowed IPs'),
template_code=ALLOWED_IPS
)
actions = columns.ActionsColumn(
actions=('edit', 'delete'),
extra_buttons=COPY_BUTTON
)
class Meta(NetBoxTable.Meta):
model = UserToken
fields = (
'pk', 'id', 'key', 'description', 'write_enabled', 'created', 'expires', 'last_used', 'allowed_ips',
)

View File

@@ -26,9 +26,8 @@ from extras.tables import BookmarkTable, NotificationTable, SubscriptionTable
from netbox.authentication import get_auth_backend_display, get_saml_idps
from netbox.config import get_config
from netbox.views import generic
from users import forms
from users import forms, tables
from users.models import UserConfig
from users.tables import TokenTable
from utilities.request import safe_for_redirect
from utilities.string import remove_linebreaks
from utilities.views import register_model_view
@@ -329,8 +328,7 @@ class UserTokenListView(LoginRequiredMixin, View):
def get(self, request):
tokens = UserToken.objects.filter(user=request.user)
table = TokenTable(tokens)
table.columns.hide('user')
table = tables.UserTokenTable(tokens)
table.configure(request)
return render(request, 'account/token_list.html', {
@@ -345,9 +343,11 @@ class UserTokenView(LoginRequiredMixin, View):
def get(self, request, pk):
token = get_object_or_404(UserToken.objects.filter(user=request.user), pk=pk)
key = token.key if settings.ALLOW_TOKEN_RETRIEVAL else None
return render(request, 'account/token.html', {
'object': token,
'key': key,
})

View File

@@ -13,8 +13,11 @@ from netbox.forms import NetBoxModelFilterSetForm
from tenancy.forms import TenancyFilterForm, ContactModelFilterForm
from utilities.forms import add_blank_choice
from utilities.forms.fields import ColorField, DynamicModelMultipleChoiceField, TagFilterField
from utilities.forms.filterset_mappings import FILTERSET_MAPPINGS
from utilities.forms.mixins import FilterModifierMixin
from utilities.forms.rendering import FieldSet
from utilities.forms.widgets import DatePicker, NumberWithOptions
from circuits.filtersets import CircuitFilterSet
__all__ = (
'CircuitFilterForm',
@@ -118,7 +121,7 @@ class CircuitTypeFilterForm(NetBoxModelFilterSetForm):
)
class CircuitFilterForm(TenancyFilterForm, ContactModelFilterForm, NetBoxModelFilterSetForm):
class CircuitFilterForm(FilterModifierMixin, TenancyFilterForm, ContactModelFilterForm, NetBoxModelFilterSetForm):
model = Circuit
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
@@ -397,3 +400,7 @@ class VirtualCircuitTerminationFilterForm(NetBoxModelFilterSetForm):
label=_('Provider')
)
tag = TagFilterField(model)
# Register FilterSet mappings for FilterModifierMixin lookup verification
FILTERSET_MAPPINGS[CircuitFilterForm] = CircuitFilterSet

View File

@@ -1,97 +0,0 @@
from django.db import migrations, models
PATTERN_OPS_INDEXES = [
'circuits_circuitgroup_name_ec8ac1e5_like',
'circuits_circuitgroup_slug_61ca866b_like',
'circuits_circuittype_name_8256ea9a_like',
'circuits_circuittype_slug_9b4b3cf9_like',
'circuits_provider_name_8f2514f5_like',
'circuits_provider_slug_c3c0aa10_like',
'circuits_virtualcircuittype_name_5184db16_like',
'circuits_virtualcircuittype_slug_75d5c661_like',
]
def remove_indexes(apps, schema_editor):
for idx in PATTERN_OPS_INDEXES:
schema_editor.execute(f'DROP INDEX IF EXISTS {idx}')
class Migration(migrations.Migration):
dependencies = [
('circuits', '0052_extend_circuit_abs_distance_upper_limit'),
('dcim', '0217_ci_collations'),
]
operations = [
migrations.RunPython(
code=remove_indexes,
reverse_code=migrations.RunPython.noop,
),
migrations.AlterField(
model_name='circuit',
name='cid',
field=models.CharField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='circuitgroup',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='circuitgroup',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='circuittype',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='circuittype',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='provider',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='provider',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='provideraccount',
name='account',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='provideraccount',
name='name',
field=models.CharField(blank=True, db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='providernetwork',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='virtualcircuit',
name='cid',
field=models.CharField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='virtualcircuittype',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='virtualcircuittype',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
]

View File

@@ -41,10 +41,9 @@ class Circuit(ContactsMixin, ImageAttachmentsMixin, DistanceMixin, PrimaryModel)
ProviderAccount. Circuit port speed and commit rate are measured in Kbps.
"""
cid = models.CharField(
verbose_name=_('circuit ID'),
max_length=100,
db_collation='case_insensitive',
help_text=_('Unique circuit ID'),
verbose_name=_('circuit ID'),
help_text=_('Unique circuit ID')
)
provider = models.ForeignKey(
to='circuits.Provider',

View File

@@ -21,14 +21,13 @@ class Provider(ContactsMixin, PrimaryModel):
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
help_text=_('Full name of the provider'),
db_collation="natural_sort"
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
unique=True,
db_collation='case_insensitive',
unique=True
)
asns = models.ManyToManyField(
to='ipam.ASN',
@@ -57,15 +56,13 @@ class ProviderAccount(ContactsMixin, PrimaryModel):
related_name='accounts'
)
account = models.CharField(
verbose_name=_('account ID'),
max_length=100,
db_collation='ci_natural_sort',
verbose_name=_('account ID')
)
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
blank=True,
blank=True
)
clone_fields = ('provider', )
@@ -100,7 +97,7 @@ class ProviderNetwork(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
provider = models.ForeignKey(
to='circuits.Provider',

View File

@@ -34,10 +34,9 @@ class VirtualCircuit(PrimaryModel):
A virtual connection between two or more endpoints, delivered across one or more physical circuits.
"""
cid = models.CharField(
verbose_name=_('circuit ID'),
max_length=100,
db_collation='case_insensitive',
help_text=_('Unique circuit ID'),
verbose_name=_('circuit ID'),
help_text=_('Unique circuit ID')
)
provider_network = models.ForeignKey(
to='circuits.ProviderNetwork',

View File

@@ -9,6 +9,7 @@ from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from rest_framework.routers import APIRootView
from rest_framework.viewsets import ReadOnlyModelViewSet
@@ -23,7 +24,7 @@ from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.pagination import LimitOffsetListPagination
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
from utilities.api import IsSuperuser
from . import serializers
@@ -99,7 +100,7 @@ class BaseRQViewSet(viewsets.ViewSet):
"""
Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data().
"""
permission_classes = [IsSuperuser]
permission_classes = [IsAdminUser]
serializer_class = None
def get_data(self):

View File

@@ -1,30 +0,0 @@
from django.db import migrations, models
PATTERN_OPS_INDEXES = [
'core_datasource_name_17788499_like',
]
def remove_indexes(apps, schema_editor):
for idx in PATTERN_OPS_INDEXES:
schema_editor.execute(f'DROP INDEX IF EXISTS {idx}')
class Migration(migrations.Migration):
dependencies = [
('core', '0019_configrevision_active'),
('dcim', '0217_ci_collations'),
]
operations = [
migrations.RunPython(
code=remove_indexes,
reverse_code=migrations.RunPython.noop,
),
migrations.AlterField(
model_name='datasource',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
]

View File

@@ -0,0 +1,3 @@
# TODO: Remove this module in NetBox v4.5
# Provided for backward compatibility
from .object_types import *

View File

@@ -38,8 +38,7 @@ class DataSource(JobsMixin, PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
type = models.CharField(
verbose_name=_('type'),

View File

@@ -8,7 +8,6 @@ from rq.job import Job as RQ_Job, JobStatus
from rq.registry import FailedJobRegistry, StartedJobRegistry
from rest_framework import status
from users.constants import TOKEN_PREFIX
from users.models import Token, User
from utilities.testing import APITestCase, APIViewTestCases, TestCase
from utilities.testing.utils import disable_logging
@@ -108,14 +107,14 @@ class ObjectTypeTest(APITestCase):
def test_list_objects(self):
object_type_count = ObjectType.objects.count()
response = self.client.get(reverse('core-api:objecttype-list'), **self.header)
response = self.client.get(reverse('extras-api:objecttype-list'), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], object_type_count)
def test_get_object(self):
object_type = ObjectType.objects.first()
url = reverse('core-api:objecttype-detail', kwargs={'pk': object_type.pk})
url = reverse('extras-api:objecttype-detail', kwargs={'pk': object_type.pk})
self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
@@ -135,9 +134,12 @@ class BackgroundTaskTestCase(TestCase):
Create a user and token for API calls.
"""
# Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser', is_active=True)
self.user = User.objects.create_user(username='testuser')
self.user.is_staff = True
self.user.is_active = True
self.user.save()
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Bearer {TOKEN_PREFIX}{self.token.key}.{self.token.token}'}
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
# Clear all queues prior to running each test
get_queue('default').connection.flushall()
@@ -148,11 +150,13 @@ class BackgroundTaskTestCase(TestCase):
url = reverse('core-api:rqqueue-list')
# Attempt to load view without permission
self.user.is_staff = False
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.is_staff = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
@@ -161,16 +165,7 @@ class BackgroundTaskTestCase(TestCase):
self.assertIn('low', str(response.content))
def test_background_queue(self):
url = reverse('core-api:rqqueue-detail', args=['default'])
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
response = self.client.get(reverse('core-api:rqqueue-detail', args=['default']), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn('default', str(response.content))
self.assertIn('oldest_job_timestamp', str(response.content))
@@ -179,16 +174,8 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task_list(self):
queue = get_queue('default')
queue.enqueue(self.dummy_job_default)
url = reverse('core-api:rqtask-list')
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
response = self.client.get(reverse('core-api:rqtask-list'), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn('origin', str(response.content))
self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content))
@@ -196,16 +183,8 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
url = reverse('core-api:rqtask-detail', args=[job.id])
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
response = self.client.get(reverse('core-api:rqtask-detail', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(job.id), str(response.content))
self.assertIn('origin', str(response.content))
@@ -215,65 +194,45 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task_delete(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
url = reverse('core-api:rqtask-delete', args=[job.id])
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
response = self.client.post(reverse('core-api:rqtask-delete', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
queue = get_queue('default')
self.assertNotIn(job.id, queue.job_ids)
def test_background_task_requeue(self):
# Enqueue & run a job that will fail
queue = get_queue('default')
# Enqueue & run a job that will fail
job = queue.enqueue(self.dummy_job_failing)
worker = get_worker('default')
with disable_logging():
worker.work(burst=True)
self.assertTrue(job.is_failed)
url = reverse('core-api:rqtask-requeue', args=[job.id])
# Attempt to requeue the job without permission
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 403)
# Re-enqueue the failed job and check that its status has been reset
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
response = self.client.post(reverse('core-api:rqtask-requeue', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
job = RQ_Job.fetch(job.id, queue.connection)
self.assertFalse(job.is_failed)
def test_background_task_enqueue(self):
# Enqueue some jobs that each depends on its predecessor
queue = get_queue('default')
# Enqueue some jobs that each depends on its predecessor
job = previous_job = None
for _ in range(0, 3):
job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
previous_job = job
url = reverse('core-api:rqtask-enqueue', args=[job.id])
# Check that the last job to be enqueued has a status of deferred
self.assertIsNotNone(job)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertIsNone(job.enqueued_at)
# Attempt to force-enqueue the job without permission
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 403)
# Force-enqueue the deferred job
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
response = self.client.post(reverse('core-api:rqtask-enqueue', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
# Check that job's status is updated correctly
@@ -283,27 +242,19 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task_stop(self):
queue = get_queue('default')
worker = get_worker('default')
job = queue.enqueue(self.dummy_job_default)
worker.prepare_job_execution(job)
url = reverse('core-api:rqtask-stop', args=[job.id])
self.assertEqual(job.get_status(), JobStatus.STARTED)
# Attempt to stop the task without permission
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 403)
# Stop the task
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
with disable_logging():
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(started_job_registry), 0)
# Verify that the task was cancelled
canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(canceled_job_registry), 1)
self.assertIn(job.id, canceled_job_registry)
@@ -311,34 +262,19 @@ class BackgroundTaskTestCase(TestCase):
def test_worker_list(self):
worker1 = get_worker('default', name=uuid.uuid4().hex)
worker1.register_birth()
worker2 = get_worker('high')
worker2.register_birth()
url = reverse('core-api:rqworker-list')
# Attempt to fetch the worker list without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Fetch the worker list
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
response = self.client.get(reverse('core-api:rqworker-list'), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))
def test_worker(self):
worker1 = get_worker('default', name=uuid.uuid4().hex)
worker1.register_birth()
url = reverse('core-api:rqworker-detail', args=[worker1.name])
# Attempt to fetch a worker without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Fetch the worker
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
response = self.client.get(reverse('core-api:rqworker-detail', args=[worker1.name]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))
self.assertIn('birth_date', str(response.content))

View File

@@ -158,7 +158,7 @@ class BackgroundTaskTestCase(TestCase):
def setUp(self):
super().setUp()
self.user.is_superuser = True
self.user.is_staff = True
self.user.is_active = True
self.user.save()
@@ -171,13 +171,13 @@ class BackgroundTaskTestCase(TestCase):
url = reverse('core:background_queue_list')
# Attempt to load view without permission
self.user.is_superuser = False
self.user.is_staff = False
self.user.save()
response = self.client.get(url)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.is_staff = True
self.user.save()
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
@@ -356,7 +356,7 @@ class SystemTestCase(TestCase):
def setUp(self):
super().setUp()
self.user.is_superuser = True
self.user.is_staff = True
self.user.save()
def test_system_view_default(self):

View File

@@ -372,7 +372,7 @@ class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
class BaseRQView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_superuser
return self.request.user.is_staff
class BackgroundQueueListView(TableMixin, BaseRQView):
@@ -555,7 +555,7 @@ class WorkerView(BaseRQView):
class SystemView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_superuser
return self.request.user.is_staff
def get(self, request):
@@ -638,7 +638,7 @@ class BasePluginView(UserPassesTestMixin, View):
CACHE_KEY_CATALOG_ERROR = 'plugins-catalog-error'
def test_func(self):
return self.request.user.is_superuser
return self.request.user.is_staff
def get_cached_plugins(self, request):
catalog_plugins = {}

View File

@@ -1,8 +1,10 @@
from django.contrib.contenttypes.models import ContentType
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from dcim.choices import *
from dcim.constants import *
from dcim.models import Cable, CablePath, CableTermination
from netbox.api.fields import ChoiceField, ContentTypeField
from netbox.api.serializers import BaseModelSerializer, GenericObjectSerializer, NetBoxModelSerializer
@@ -49,11 +51,9 @@ class TracedCableSerializer(BaseModelSerializer):
class CableTerminationSerializer(NetBoxModelSerializer):
termination_type = ContentTypeField(
read_only=True,
)
termination = serializers.SerializerMethodField(
read_only=True,
queryset=ContentType.objects.filter(CABLE_TERMINATION_MODELS)
)
termination = serializers.SerializerMethodField(read_only=True)
class Meta:
model = CableTermination
@@ -61,8 +61,6 @@ class CableTerminationSerializer(NetBoxModelSerializer):
'id', 'url', 'display', 'cable', 'cable_end', 'termination_type', 'termination_id',
'termination', 'created', 'last_updated',
]
read_only_fields = fields
brief_fields = ('id', 'url', 'display', 'cable', 'cable_end', 'termination_type', 'termination_id')
@extend_schema_field(serializers.JSONField(allow_null=True))
def get_termination(self, obj):

View File

@@ -155,7 +155,7 @@ class PowerOutletTemplateSerializer(ComponentTemplateSerializer):
model = PowerOutletTemplate
fields = [
'id', 'url', 'display', 'device_type', 'module_type', 'name', 'label', 'type',
'color', 'power_port', 'feed_leg', 'description', 'created', 'last_updated',
'power_port', 'feed_leg', 'description', 'created', 'last_updated',
]
brief_fields = ('id', 'url', 'display', 'name', 'description')

View File

@@ -16,7 +16,7 @@ from extras.api.mixins import ConfigContextQuerySetMixin, RenderConfigMixin
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.pagination import StripCountAnnotationsPaginator
from netbox.api.viewsets import NetBoxModelViewSet, MPTTLockedMixin, NetBoxReadOnlyModelViewSet
from netbox.api.viewsets import NetBoxModelViewSet, MPTTLockedMixin
from netbox.api.viewsets.mixins import SequentialBulkCreatesMixin
from utilities.api import get_serializer_for_model
from utilities.query_functions import CollateAsChar
@@ -563,7 +563,7 @@ class CableViewSet(NetBoxModelViewSet):
filterset_class = filtersets.CableFilterSet
class CableTerminationViewSet(NetBoxReadOnlyModelViewSet):
class CableTerminationViewSet(NetBoxModelViewSet):
metadata_class = ContentTypeMetadata
queryset = CableTermination.objects.all()
serializer_class = serializers.CableTerminationSerializer

View File

@@ -14,16 +14,16 @@ from netbox.filtersets import (
AttributeFiltersMixin, BaseFilterSet, ChangeLoggedModelFilterSet, NestedGroupModelFilterSet, NetBoxModelFilterSet,
OrganizationalModelFilterSet,
)
from tenancy.filtersets import TenancyFilterSet, ContactModelFilterSet
from tenancy.filtersets import ContactModelFilterSet, TenancyFilterSet
from tenancy.models import *
from users.models import User
from utilities.filters import (
ContentTypeFilter, MultiValueCharFilter, MultiValueMACAddressFilter, MultiValueNumberFilter, MultiValueWWNFilter,
NumericArrayFilter, TreeNodeMultipleChoiceFilter,
)
from virtualization.models import Cluster, ClusterGroup, VMInterface, VirtualMachine
from virtualization.models import Cluster, ClusterGroup, VirtualMachine, VMInterface
from vpn.models import L2VPN
from wireless.choices import WirelessRoleChoices, WirelessChannelChoices
from wireless.choices import WirelessChannelChoices, WirelessRoleChoices
from wireless.models import WirelessLAN, WirelessLink
from .choices import *
from .constants import *
@@ -842,7 +842,7 @@ class PowerOutletTemplateFilterSet(ChangeLoggedModelFilterSet, ModularDeviceType
class Meta:
model = PowerOutletTemplate
fields = ('id', 'name', 'label', 'type', 'color', 'feed_leg', 'description')
fields = ('id', 'name', 'label', 'type', 'feed_leg', 'description')
class InterfaceTemplateFilterSet(ChangeLoggedModelFilterSet, ModularDeviceTypeComponentFilterSet):
@@ -1807,6 +1807,14 @@ class MACAddressFilterSet(NetBoxModelFilterSet):
queryset=VMInterface.objects.all(),
label=_('VM interface (ID)'),
)
assigned = django_filters.BooleanFilter(
method='filter_assigned',
label=_('Is assigned'),
)
primary = django_filters.BooleanFilter(
method='filter_primary',
label=_('Is primary'),
)
class Meta:
model = MACAddress
@@ -1843,6 +1851,29 @@ class MACAddressFilterSet(NetBoxModelFilterSet):
vminterface__in=interface_ids
)
def filter_assigned(self, queryset, name, value):
params = {
'assigned_object_type__isnull': True,
'assigned_object_id__isnull': True,
}
if value:
return queryset.exclude(**params)
else:
return queryset.filter(**params)
def filter_primary(self, queryset, name, value):
interface_mac_ids = Interface.objects.filter(primary_mac_address_id__isnull=False).values_list(
'primary_mac_address_id', flat=True
)
vminterface_mac_ids = VMInterface.objects.filter(primary_mac_address_id__isnull=False).values_list(
'primary_mac_address_id', flat=True
)
query = Q(pk__in=interface_mac_ids) | Q(pk__in=vminterface_mac_ids)
if value:
return queryset.filter(query)
else:
return queryset.exclude(query)
class CommonInterfaceFilterSet(django_filters.FilterSet):
mode = django_filters.MultipleChoiceFilter(

View File

@@ -1163,10 +1163,6 @@ class PowerOutletTemplateBulkEditForm(ComponentTemplateBulkEditForm):
choices=add_blank_choice(PowerOutletTypeChoices),
required=False
)
color = ColorField(
label=_('Color'),
required=False
)
power_port = forms.ModelChoiceField(
label=_('Power port'),
queryset=PowerPortTemplate.objects.all(),

View File

@@ -3,6 +3,7 @@ from django.utils.translation import gettext_lazy as _
from dcim.choices import *
from dcim.constants import *
from dcim.filtersets import DeviceFilterSet, PowerOutletFilterSet, RackFilterSet
from dcim.models import *
from extras.forms import LocalConfigContextFilterForm
from extras.models import ConfigTemplate
@@ -13,6 +14,8 @@ from tenancy.forms import ContactModelFilterForm, TenancyFilterForm
from users.models import User
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES, FilterForm, add_blank_choice
from utilities.forms.fields import ColorField, DynamicModelMultipleChoiceField, TagFilterField
from utilities.forms.filterset_mappings import FILTERSET_MAPPINGS
from utilities.forms.mixins import FilterModifierMixin
from utilities.forms.rendering import FieldSet
from utilities.forms.widgets import NumberWithOptions
from virtualization.models import Cluster, ClusterGroup, VirtualMachine
@@ -317,7 +320,7 @@ class RackTypeFilterForm(RackBaseFilterForm):
tag = TagFilterField(model)
class RackFilterForm(TenancyFilterForm, ContactModelFilterForm, RackBaseFilterForm):
class RackFilterForm(FilterModifierMixin, TenancyFilterForm, ContactModelFilterForm, RackBaseFilterForm):
model = Rack
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
@@ -738,6 +741,7 @@ class PlatformFilterForm(NetBoxModelFilterSetForm):
class DeviceFilterForm(
FilterModifierMixin,
LocalConfigContextFilterForm,
TenancyFilterForm,
ContactModelFilterForm,
@@ -1378,7 +1382,7 @@ class PowerPortFilterForm(PathEndpointFilterForm, DeviceComponentFilterForm):
tag = TagFilterField(model)
class PowerOutletFilterForm(PathEndpointFilterForm, DeviceComponentFilterForm):
class PowerOutletFilterForm(FilterModifierMixin, PathEndpointFilterForm, DeviceComponentFilterForm):
model = PowerOutlet
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
@@ -1676,12 +1680,16 @@ class MACAddressFilterForm(NetBoxModelFilterSetForm):
model = MACAddress
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
FieldSet('mac_address', 'device_id', 'virtual_machine_id', name=_('MAC address')),
FieldSet('mac_address', name=_('Attributes')),
FieldSet(
'device_id', 'virtual_machine_id', 'assigned', 'primary',
name=_('Assignments'),
),
)
selector_fields = ('filter_id', 'q', 'device_id', 'virtual_machine_id')
mac_address = forms.CharField(
required=False,
label=_('MAC address')
label=_('MAC address'),
)
device_id = DynamicModelMultipleChoiceField(
queryset=Device.objects.all(),
@@ -1693,6 +1701,20 @@ class MACAddressFilterForm(NetBoxModelFilterSetForm):
required=False,
label=_('Assigned VM'),
)
assigned = forms.NullBooleanField(
required=False,
label=_('Assigned to an interface'),
widget=forms.Select(
choices=BOOLEAN_WITH_BLANK_CHOICES
),
)
primary = forms.NullBooleanField(
required=False,
label=_('Primary MAC of an interface'),
widget=forms.Select(
choices=BOOLEAN_WITH_BLANK_CHOICES
),
)
tag = TagFilterField(model)
@@ -1770,3 +1792,9 @@ class InterfaceConnectionFilterForm(FilterForm):
},
label=_('Device')
)
# Register FilterSet mappings for FilterModifierMixin lookup verification
FILTERSET_MAPPINGS[DeviceFilterForm] = DeviceFilterSet
FILTERSET_MAPPINGS[RackFilterForm] = RackFilterSet
FILTERSET_MAPPINGS[PowerOutletFilterForm] = PowerOutletFilterSet

View File

@@ -755,7 +755,10 @@ class ModuleForm(ModuleCommonForm, NetBoxModelForm):
queryset=ModuleBay.objects.all(),
query_params={
'device_id': '$device'
}
},
context={
'disabled': 'installed_module',
},
)
module_type = DynamicModelChoiceField(
label=_('Module type'),
@@ -1092,14 +1095,14 @@ class PowerOutletTemplateForm(ModularComponentTemplateForm):
FieldSet('device_type', name=_('Device Type')),
FieldSet('module_type', name=_('Module Type')),
),
'name', 'label', 'type', 'color', 'power_port', 'feed_leg', 'description',
'name', 'label', 'type', 'power_port', 'feed_leg', 'description',
),
)
class Meta:
model = PowerOutletTemplate
fields = [
'device_type', 'module_type', 'name', 'label', 'type', 'color', 'power_port', 'feed_leg', 'description',
'device_type', 'module_type', 'name', 'label', 'type', 'power_port', 'feed_leg', 'description',
]

View File

@@ -18,7 +18,9 @@ from netbox.graphql.filter_mixins import (
ImageAttachmentFilterMixin,
WeightFilterMixin,
)
from tenancy.graphql.filter_mixins import TenancyFilterMixin, ContactFilterMixin
from tenancy.graphql.filter_mixins import ContactFilterMixin, TenancyFilterMixin
from virtualization.models import VMInterface
from .filter_mixins import (
CabledObjectModelFilterMixin,
ComponentModelFilterMixin,
@@ -419,6 +421,24 @@ class MACAddressFilter(PrimaryModelFilterMixin):
)
assigned_object_id: ID | None = strawberry_django.filter_field()
@strawberry_django.filter_field()
def assigned(self, value: bool, prefix) -> Q:
return Q(**{f'{prefix}assigned_object_id__isnull': (not value)})
@strawberry_django.filter_field()
def primary(self, value: bool, prefix) -> Q:
interface_mac_ids = models.Interface.objects.filter(primary_mac_address_id__isnull=False).values_list(
'primary_mac_address_id', flat=True
)
vminterface_mac_ids = VMInterface.objects.filter(primary_mac_address_id__isnull=False).values_list(
'primary_mac_address_id', flat=True
)
query = Q(**{f'{prefix}pk__in': interface_mac_ids}) | Q(**{f'{prefix}pk__in': vminterface_mac_ids})
if value:
return Q(query)
else:
return ~Q(query)
@strawberry_django.filter_type(models.Interface, lookups=True)
class InterfaceFilter(ModularComponentModelFilterMixin, InterfaceBaseFilterMixin, CabledObjectModelFilterMixin):

View File

@@ -673,7 +673,6 @@ class PowerOutletType(ModularComponentType, CabledObjectMixin, PathEndpointMixin
)
class PowerOutletTemplateType(ModularComponentTemplateType):
power_port: Annotated["PowerPortTemplateType", strawberry.lazy('dcim.graphql.types')] | None
color: str
@strawberry_django.type(

View File

@@ -1,17 +0,0 @@
import utilities.fields
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('dcim', '0215_rackreservation_status'),
]
operations = [
migrations.AddField(
model_name='poweroutlettemplate',
name='color',
field=utilities.fields.ColorField(blank=True, max_length=6),
),
]

View File

@@ -1,26 +0,0 @@
from django.contrib.postgres.operations import CreateCollation
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('dcim', '0216_poweroutlettemplate_color'),
]
operations = [
# Create a case-insensitive collation
CreateCollation(
'case_insensitive',
provider='icu',
locale='und-u-ks-level2',
deterministic=False,
),
# Create a case-insensitive collation with natural sorting
CreateCollation(
'ci_natural_sort',
provider='icu',
locale='und-u-kn-true-ks-level2',
deterministic=False,
),
]

View File

@@ -1,311 +0,0 @@
from django.db import migrations, models
PATTERN_OPS_INDEXES = [
'dcim_devicerole_slug_7952643b_like',
'dcim_devicetype_slug_448745bd_like',
'dcim_inventoryitemrole_name_4c8cfe6d_like',
'dcim_inventoryitemrole_slug_3556c227_like',
'dcim_location_slug_352c5472_like',
'dcim_manufacturer_name_841fcd92_like',
'dcim_manufacturer_slug_00430749_like',
'dcim_moduletypeprofile_name_1709c36e_like',
'dcim_platform_slug_b0908ae4_like',
'dcim_rackrole_name_9077cfcc_like',
'dcim_rackrole_slug_40bbcd3a_like',
'dcim_racktype_slug_6bbb384a_like',
'dcim_region_slug_ff078a66_like',
'dcim_site_name_8fe66c76_like',
'dcim_site_slug_4412c762_like',
'dcim_sitegroup_slug_a11d2b04_like',
]
def remove_indexes(apps, schema_editor):
for idx in PATTERN_OPS_INDEXES:
schema_editor.execute(f'DROP INDEX IF EXISTS {idx}')
class Migration(migrations.Migration):
dependencies = [
('dcim', '0217_ci_collations'),
('extras', '0134_ci_collations'),
('ipam', '0083_ci_collations'),
('tenancy', '0021_ci_collations'),
('virtualization', '0048_populate_mac_addresses'),
]
operations = [
migrations.RunPython(
code=remove_indexes,
reverse_code=migrations.RunPython.noop,
),
migrations.RemoveConstraint(
model_name='device',
name='dcim_device_unique_name_site_tenant',
),
migrations.RemoveConstraint(
model_name='device',
name='dcim_device_unique_name_site',
),
migrations.AlterField(
model_name='consoleport',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='consoleporttemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='consoleserverport',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='consoleserverporttemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='device',
name='name',
field=models.CharField(blank=True, db_collation='ci_natural_sort', max_length=64, null=True),
),
migrations.AlterField(
model_name='devicebay',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='devicebaytemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='devicerole',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='devicerole',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='devicetype',
name='model',
field=models.CharField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='devicetype',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='frontport',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='frontporttemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='interface',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='interfacetemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='inventoryitem',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='inventoryitemrole',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='inventoryitemrole',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='inventoryitemtemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='location',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='location',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='manufacturer',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='manufacturer',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='modulebay',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='modulebaytemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='moduletype',
name='model',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='moduletypeprofile',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='platform',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='platform',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='powerfeed',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='poweroutlet',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='poweroutlettemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='powerpanel',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='powerport',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='powerporttemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='rack',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='rackrole',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='rackrole',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='racktype',
name='model',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='racktype',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='rearport',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='rearporttemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='region',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='region',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='site',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='site',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='sitegroup',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='sitegroup',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='virtualdevicecontext',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AddConstraint(
model_name='device',
constraint=models.UniqueConstraint(
models.F('name'), models.F('site'), models.F('tenant'), name='dcim_device_unique_name_site_tenant'
),
),
migrations.AddConstraint(
model_name='device',
constraint=models.UniqueConstraint(
models.F('name'),
models.F('site'),
condition=models.Q(('tenant__isnull', True)),
name='dcim_device_unique_name_site',
violation_error_message='Device name must be unique per site.',
),
),
]

View File

@@ -43,10 +43,10 @@ class ComponentTemplateModel(ChangeLoggedModel, TrackingModelMixin):
name = models.CharField(
verbose_name=_('name'),
max_length=64,
db_collation='ci_natural_sort',
help_text=_(
"{module} is accepted as a substitution for the module bay position when attached to a module type."
),
db_collation="natural_sort"
)
label = models.CharField(
verbose_name=_('label'),
@@ -339,10 +339,6 @@ class PowerOutletTemplate(ModularComponentTemplateModel):
blank=True,
null=True
)
color = ColorField(
verbose_name=_('color'),
blank=True
)
power_port = models.ForeignKey(
to='dcim.PowerPortTemplate',
on_delete=models.SET_NULL,
@@ -393,7 +389,6 @@ class PowerOutletTemplate(ModularComponentTemplateModel):
name=self.resolve_name(kwargs.get('module')),
label=self.resolve_label(kwargs.get('module')),
type=self.type,
color=self.color,
power_port=power_port,
feed_leg=self.feed_leg,
**kwargs
@@ -404,7 +399,6 @@ class PowerOutletTemplate(ModularComponentTemplateModel):
return {
'name': self.name,
'type': self.type,
'color': self.color,
'power_port': self.power_port.name if self.power_port else None,
'feed_leg': self.feed_leg,
'label': self.label,

View File

@@ -52,7 +52,7 @@ class ComponentModel(NetBoxModel):
name = models.CharField(
verbose_name=_('name'),
max_length=64,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
label = models.CharField(
verbose_name=_('label'),

View File

@@ -1,7 +1,8 @@
import decimal
import yaml
from functools import cached_property
import yaml
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
@@ -9,6 +10,7 @@ from django.core.files.storage import default_storage
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.db.models import F, ProtectedError, prefetch_related_objects
from django.db.models.functions import Lower
from django.db.models.signals import post_save
from django.urls import reverse
from django.utils.safestring import mark_safe
@@ -23,8 +25,8 @@ from extras.querysets import ConfigContextModelQuerySet
from netbox.choices import ColorChoices
from netbox.config import ConfigItem
from netbox.models import NestedGroupModel, OrganizationalModel, PrimaryModel
from netbox.models.features import ContactsMixin, ImageAttachmentsMixin
from netbox.models.mixins import WeightMixin
from netbox.models.features import ContactsMixin, ImageAttachmentsMixin
from utilities.fields import ColorField, CounterCacheField
from utilities.prefetch import get_prefetchable_fields
from utilities.tracking import TrackingModelMixin
@@ -32,6 +34,7 @@ from .device_components import *
from .mixins import RenderConfigMixin
from .modules import Module
__all__ = (
'Device',
'DeviceRole',
@@ -80,13 +83,11 @@ class DeviceType(ImageAttachmentsMixin, PrimaryModel, WeightMixin):
)
model = models.CharField(
verbose_name=_('model'),
max_length=100,
db_collation='case_insensitive',
max_length=100
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
db_collation='case_insensitive',
max_length=100
)
default_platform = models.ForeignKey(
to='dcim.Platform',
@@ -524,7 +525,7 @@ class Device(
max_length=64,
blank=True,
null=True,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
serial = models.CharField(
max_length=50,
@@ -720,11 +721,11 @@ class Device(
ordering = ('name', 'pk') # Name may be null
constraints = (
models.UniqueConstraint(
'name', 'site', 'tenant',
Lower('name'), 'site', 'tenant',
name='%(app_label)s_%(class)s_unique_name_site_tenant'
),
models.UniqueConstraint(
'name', 'site',
Lower('name'), 'site',
name='%(app_label)s_%(class)s_unique_name_site',
condition=Q(tenant__isnull=True),
violation_error_message=_("Device name must be unique per site.")
@@ -1118,7 +1119,7 @@ class VirtualChassis(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=64,
db_collation='natural_sort',
db_collation="natural_sort"
)
domain = models.CharField(
verbose_name=_('domain'),
@@ -1181,7 +1182,7 @@ class VirtualDeviceContext(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=64,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
status = models.CharField(
verbose_name=_('status'),

View File

@@ -31,8 +31,7 @@ class ModuleTypeProfile(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
schema = models.JSONField(
blank=True,
@@ -73,8 +72,7 @@ class ModuleType(ImageAttachmentsMixin, PrimaryModel, WeightMixin):
)
model = models.CharField(
verbose_name=_('model'),
max_length=100,
db_collation='ci_natural_sort',
max_length=100
)
part_number = models.CharField(
verbose_name=_('part number'),

View File

@@ -37,7 +37,7 @@ class PowerPanel(ContactsMixin, ImageAttachmentsMixin, PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
prerequisite_models = (
@@ -88,7 +88,7 @@ class PowerFeed(PrimaryModel, PathEndpoint, CabledObjectModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
status = models.CharField(
verbose_name=_('status'),

View File

@@ -137,14 +137,12 @@ class RackType(RackBase):
)
model = models.CharField(
verbose_name=_('model'),
max_length=100,
db_collation='ci_natural_sort',
max_length=100
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
unique=True,
db_collation='case_insensitive',
unique=True
)
clone_fields = (
@@ -264,7 +262,7 @@ class Rack(ContactsMixin, ImageAttachmentsMixin, RackBase):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
facility_id = models.CharField(
max_length=50,

View File

@@ -142,14 +142,13 @@ class Site(ContactsMixin, ImageAttachmentsMixin, PrimaryModel):
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
help_text=_("Full name of the site")
help_text=_("Full name of the site"),
db_collation="natural_sort"
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
unique=True,
db_collation='case_insensitive',
unique=True
)
status = models.CharField(
verbose_name=_('status'),

View File

@@ -1174,6 +1174,9 @@ class MACAddressTable(NetBoxTable):
orderable=False,
verbose_name=_('Parent')
)
is_primary = columns.BooleanColumn(
verbose_name=_('Primary')
)
tags = columns.TagColumn(
url_name='dcim:macaddress_list'
)
@@ -1184,7 +1187,7 @@ class MACAddressTable(NetBoxTable):
class Meta(DeviceComponentTable.Meta):
model = models.MACAddress
fields = (
'pk', 'id', 'mac_address', 'assigned_object_parent', 'assigned_object', 'description', 'comments', 'tags',
'created', 'last_updated',
'pk', 'id', 'mac_address', 'assigned_object_parent', 'assigned_object', 'description', 'is_primary',
'comments', 'tags', 'created', 'last_updated',
)
default_columns = ('pk', 'mac_address', 'assigned_object_parent', 'assigned_object', 'description')

View File

@@ -211,9 +211,6 @@ class PowerPortTemplateTable(ComponentTemplateTable):
class PowerOutletTemplateTable(ComponentTemplateTable):
color = columns.ColorColumn(
verbose_name=_('Color'),
)
actions = columns.ActionsColumn(
actions=('edit', 'delete'),
extra_buttons=MODULAR_COMPONENT_TEMPLATE_BUTTONS
@@ -221,7 +218,7 @@ class PowerOutletTemplateTable(ComponentTemplateTable):
class Meta(ComponentTemplateTable.Meta):
model = models.PowerOutletTemplate
fields = ('pk', 'name', 'label', 'type', 'color', 'power_port', 'feed_leg', 'description', 'actions')
fields = ('pk', 'name', 'label', 'type', 'power_port', 'feed_leg', 'description', 'actions')
empty_text = "None"

View File

@@ -13,8 +13,7 @@ from ipam.choices import VLANQinQRoleChoices
from ipam.models import ASN, RIR, VLAN, VRF
from netbox.api.serializers import GenericObjectSerializer
from tenancy.models import Tenant
from users.constants import TOKEN_PREFIX
from users.models import Token, User
from users.models import User
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
from virtualization.models import Cluster, ClusterType
from wireless.choices import WirelessChannelChoices
@@ -1307,6 +1306,7 @@ class DeviceTest(APIViewTestCases.APIViewTestCase):
}
user_permissions = (
'dcim.view_site', 'dcim.view_rack', 'dcim.view_location', 'dcim.view_devicerole', 'dcim.view_devicetype',
'extras.view_configtemplate',
)
@classmethod
@@ -1486,58 +1486,12 @@ class DeviceTest(APIViewTestCases.APIViewTestCase):
device.config_template = configtemplate
device.save()
self.add_permissions('dcim.render_config_device', 'dcim.view_device')
url = reverse('dcim-api:device-render-config', kwargs={'pk': device.pk})
self.add_permissions('dcim.add_device')
url = reverse('dcim-api:device-detail', kwargs={'pk': device.pk}) + 'render-config/'
response = self.client.post(url, {}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['content'], f'Config for device {device.name}')
def test_render_config_without_permission(self):
configtemplate = ConfigTemplate.objects.create(
name='Config Template 1',
template_code='Config for device {{ device.name }}'
)
device = Device.objects.first()
device.config_template = configtemplate
device.save()
# No permissions added - user has no render_config permission
url = reverse('dcim-api:device-render-config', kwargs={'pk': device.pk})
response = self.client.post(url, {}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_404_NOT_FOUND)
def test_render_config_token_write_enabled(self):
configtemplate = ConfigTemplate.objects.create(
name='Config Template 1',
template_code='Config for device {{ device.name }}'
)
device = Device.objects.first()
device.config_template = configtemplate
device.save()
self.add_permissions('dcim.render_config_device', 'dcim.view_device')
url = reverse('dcim-api:device-render-config', kwargs={'pk': device.pk})
# Request without token auth should fail with PermissionDenied
response = self.client.post(url, {}, format='json')
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
# Create token with write_enabled=False
token = Token.objects.create(version=2, user=self.user, write_enabled=False)
token_header = f'Bearer {TOKEN_PREFIX}{token.key}.{token.token}'
# Request with write-disabled token should fail
response = self.client.post(url, {}, format='json', HTTP_AUTHORIZATION=token_header)
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
# Enable write and retry
token.write_enabled = True
token.save()
response = self.client.post(url, {}, format='json', HTTP_AUTHORIZATION=token_header)
self.assertHttpStatus(response, status.HTTP_200_OK)
class ModuleTest(APIViewTestCases.APIViewTestCase):
model = Module
@@ -2422,33 +2376,6 @@ class CableTest(APIViewTestCases.APIViewTestCase):
]
class CableTerminationTest(
APIViewTestCases.GetObjectViewTestCase,
APIViewTestCases.ListObjectsViewTestCase,
):
model = CableTermination
brief_fields = ['cable', 'cable_end', 'display', 'id', 'termination_id', 'termination_type', 'url']
@classmethod
def setUpTestData(cls):
device1 = create_test_device('Device 1')
device2 = create_test_device('Device 2')
interfaces = []
for device in (device1, device2):
for i in range(0, 10):
interfaces.append(Interface(device=device, type=InterfaceTypeChoices.TYPE_1GE_FIXED, name=f'eth{i}'))
Interface.objects.bulk_create(interfaces)
cables = (
Cable(a_terminations=[interfaces[0]], b_terminations=[interfaces[10]], label='Cable 1'),
Cable(a_terminations=[interfaces[1]], b_terminations=[interfaces[11]], label='Cable 2'),
Cable(a_terminations=[interfaces[2]], b_terminations=[interfaces[12]], label='Cable 3'),
)
for cable in cables:
cable.save()
class ConnectedDeviceTest(APITestCase):
@classmethod

View File

@@ -10,7 +10,7 @@ from netbox.choices import ColorChoices, WeightUnitChoices
from tenancy.models import Tenant, TenantGroup
from users.models import User
from utilities.testing import ChangeLoggedFilterSetTests, create_test_device, create_test_virtualmachine
from virtualization.models import Cluster, ClusterType, ClusterGroup, VMInterface, VirtualMachine
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine, VMInterface
from wireless.choices import WirelessChannelChoices, WirelessRoleChoices
from wireless.models import WirelessLink
@@ -1919,21 +1919,18 @@ class PowerOutletTemplateTestCase(TestCase, DeviceComponentTemplateFilterSetTest
device_type=device_types[0],
name='Power Outlet 1',
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
color=ColorChoices.COLOR_RED,
description='foobar1'
),
PowerOutletTemplate(
device_type=device_types[1],
name='Power Outlet 2',
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_B,
color=ColorChoices.COLOR_GREEN,
description='foobar2'
),
PowerOutletTemplate(
device_type=device_types[2],
name='Power Outlet 3',
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_C,
color=ColorChoices.COLOR_BLUE,
description='foobar3'
),
))
@@ -1946,10 +1943,6 @@ class PowerOutletTemplateTestCase(TestCase, DeviceComponentTemplateFilterSetTest
params = {'feed_leg': [PowerOutletFeedLegChoices.FEED_LEG_A, PowerOutletFeedLegChoices.FEED_LEG_B]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_color(self):
params = {'color': [ColorChoices.COLOR_RED, ColorChoices.COLOR_GREEN]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class InterfaceTemplateTestCase(TestCase, DeviceComponentTemplateFilterSetTests, ChangeLoggedFilterSetTests):
queryset = InterfaceTemplate.objects.all()
@@ -7171,9 +7164,20 @@ class MACAddressTestCase(TestCase, ChangeLoggedFilterSetTests):
MACAddress(mac_address='00-00-00-05-01-01', assigned_object=vm_interfaces[1]),
MACAddress(mac_address='00-00-00-06-01-01', assigned_object=vm_interfaces[2]),
MACAddress(mac_address='00-00-00-06-01-02', assigned_object=vm_interfaces[2]),
# unassigned
MACAddress(mac_address='00-00-00-07-01-01'),
)
MACAddress.objects.bulk_create(mac_addresses)
# Set MAC addresses as primary
for idx, interface in enumerate(interfaces):
interface.primary_mac_address = mac_addresses[idx]
interface.save()
for idx, vm_interface in enumerate(vm_interfaces):
# Offset by 4 for device MACs
vm_interface.primary_mac_address = mac_addresses[idx + 4]
vm_interface.save()
def test_mac_address(self):
params = {'mac_address': ['00-00-00-01-01-01', '00-00-00-02-01-01']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
@@ -7205,3 +7209,15 @@ class MACAddressTestCase(TestCase, ChangeLoggedFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'vminterface': [vm_interfaces[0].name, vm_interfaces[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_assigned(self):
params = {'assigned': True}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8)
params = {'assigned': False}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_primary(self):
params = {'primary': True}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
params = {'primary': False}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)

View File

@@ -4,7 +4,6 @@ from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.status import HTTP_400_BAD_REQUEST
from netbox.api.authentication import TokenWritePermission
from netbox.api.renderers import TextRenderer
from .serializers import ConfigTemplateSerializer
@@ -65,24 +64,12 @@ class RenderConfigMixin(ConfigTemplateRenderMixin):
"""
Provides a /render-config/ endpoint for REST API views whose model may have a ConfigTemplate assigned.
"""
def get_permissions(self):
# For render_config action, check only token write ability (not model permissions)
if self.action == 'render_config':
return [TokenWritePermission()]
return super().get_permissions()
@action(detail=True, methods=['post'], url_path='render-config', renderer_classes=[JSONRenderer, TextRenderer])
def render_config(self, request, pk):
"""
Resolve and render the preferred ConfigTemplate for this Device.
"""
# Override restrict() on the default queryset to enforce the render_config & view actions
self.queryset = self.queryset.model.objects.restrict(request.user, 'render_config').restrict(
request.user, 'view'
)
instance = self.get_object()
object_type = instance._meta.model_name
configtemplate = instance.get_config_template()
if not configtemplate:

View File

@@ -5,6 +5,7 @@ from rest_framework import serializers
from core.api.serializers_.jobs import JobSerializer
from extras.models import Script
from netbox.api.serializers import ValidatedModelSerializer
from utilities.datetime import local_now
__all__ = (
'ScriptDetailSerializer',
@@ -66,11 +67,31 @@ class ScriptInputSerializer(serializers.Serializer):
interval = serializers.IntegerField(required=False, allow_null=True)
def validate_schedule_at(self, value):
if value and not self.context['script'].python_class.scheduling_enabled:
raise serializers.ValidationError(_("Scheduling is not enabled for this script."))
"""
Validates the specified schedule time for a script execution.
"""
if value:
if not self.context['script'].python_class.scheduling_enabled:
raise serializers.ValidationError(_('Scheduling is not enabled for this script.'))
if value < local_now():
raise serializers.ValidationError(_('Scheduled time must be in the future.'))
return value
def validate_interval(self, value):
"""
Validates the provided interval based on the script's scheduling configuration.
"""
if value and not self.context['script'].python_class.scheduling_enabled:
raise serializers.ValidationError(_("Scheduling is not enabled for this script."))
raise serializers.ValidationError(_('Scheduling is not enabled for this script.'))
return value
def validate(self, data):
"""
Validates the given data and ensures the necessary fields are populated.
"""
# Set the schedule_at time to now if only an interval is provided
# while handling the case where schedule_at is null.
if data.get('interval') and not data.get('schedule_at'):
data['schedule_at'] = local_now()
return super().validate(data)

View File

@@ -1,8 +1,10 @@
from django.urls import include, path
from core.api.views import ObjectTypeViewSet
from netbox.api.routers import NetBoxRouter
from . import views
router = NetBoxRouter()
router.APIRootView = views.ExtrasRootView
@@ -27,6 +29,9 @@ router.register('config-context-profiles', views.ConfigContextProfileViewSet)
router.register('config-templates', views.ConfigTemplateViewSet)
router.register('scripts', views.ScriptViewSet, basename='script')
# TODO: Remove in NetBox v4.5
router.register('object-types', ObjectTypeViewSet)
app_name = 'extras-api'
urlpatterns = [
path('dashboard/', views.DashboardView.as_view(), name='dashboard'),

View File

@@ -16,7 +16,7 @@ from rq import Worker
from extras import filtersets
from extras.jobs import ScriptJob
from extras.models import *
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired, TokenWritePermission
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.renderers import TextRenderer
@@ -238,22 +238,13 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
serializer_class = serializers.ConfigTemplateSerializer
filterset_class = filtersets.ConfigTemplateFilterSet
def get_permissions(self):
# For render action, check only token write ability (not model permissions)
if self.action == 'render':
return [TokenWritePermission()]
return super().get_permissions()
@action(detail=True, methods=['post'], renderer_classes=[JSONRenderer, TextRenderer])
def render(self, request, pk):
"""
Render a ConfigTemplate using the context data provided (if any). If the client requests "text/plain" data,
return the raw rendered content, rather than serialized JSON.
"""
# Override restrict() on the default queryset to enforce the render & view actions
self.queryset = self.queryset.model.objects.restrict(request.user, 'render').restrict(request.user, 'view')
configtemplate = self.get_object()
context = request.data
return self.render_configtemplate(request, configtemplate, context)

View File

@@ -1,114 +0,0 @@
import django.core.validators
import re
from django.db import migrations, models
PATTERN_OPS_INDEXES = [
'extras_configcontext_name_4bbfe25d_like',
'extras_configcontextprofile_name_070de83b_like',
'extras_customfield_name_2fe72707_like',
'extras_customfieldchoiceset_name_963e63ea_like',
'extras_customlink_name_daed2d18_like',
'extras_eventrule_name_899453c6_like',
'extras_notificationgroup_name_70b0a3f9_like',
'extras_savedfilter_name_8a4bbd09_like',
'extras_savedfilter_slug_4f93a959_like',
'extras_tag_name_9550b3d9_like',
'extras_tag_slug_aaa5b7e9_like',
'extras_webhook_name_82cf60b5_like',
]
def remove_indexes(apps, schema_editor):
for idx in PATTERN_OPS_INDEXES:
schema_editor.execute(f'DROP INDEX IF EXISTS {idx}')
class Migration(migrations.Migration):
dependencies = [
('extras', '0133_make_cf_minmax_decimal'),
('dcim', '0217_ci_collations'),
]
operations = [
migrations.RunPython(
code=remove_indexes,
reverse_code=migrations.RunPython.noop,
),
migrations.AlterField(
model_name='configcontext',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='configcontextprofile',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='customfield',
name='name',
field=models.CharField(
db_collation='ci_natural_sort',
max_length=50,
unique=True,
validators=[
django.core.validators.RegexValidator(
flags=re.RegexFlag['IGNORECASE'],
message='Only alphanumeric characters and underscores are allowed.',
regex='^[a-z0-9_]+$',
),
django.core.validators.RegexValidator(
flags=re.RegexFlag['IGNORECASE'],
inverse_match=True,
message='Double underscores are not permitted in custom field names.',
regex='__',
),
],
),
),
migrations.AlterField(
model_name='customfieldchoiceset',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='customlink',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='eventrule',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=150, unique=True),
),
migrations.AlterField(
model_name='notificationgroup',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='savedfilter',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='savedfilter',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='tag',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='tag',
name='slug',
field=models.SlugField(allow_unicode=True, db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='webhook',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=150, unique=True),
),
]

View File

@@ -35,8 +35,7 @@ class ConfigContextProfile(SyncedDataMixin, PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
description = models.CharField(
verbose_name=_('description'),
@@ -78,8 +77,7 @@ class ConfigContext(SyncedDataMixin, CloningMixin, CustomLinksMixin, ChangeLogge
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
profile = models.ForeignKey(
to='extras.ConfigContextProfile',

View File

@@ -94,7 +94,6 @@ class CustomField(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
verbose_name=_('name'),
max_length=50,
unique=True,
db_collation='ci_natural_sort',
help_text=_('Internal field name'),
validators=(
RegexValidator(
@@ -536,6 +535,15 @@ class CustomField(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
# URL
elif self.type == CustomFieldTypeChoices.TYPE_URL:
field = LaxURLField(assume_scheme='https', required=required, initial=initial)
if self.validation_regex:
field.validators = [
RegexValidator(
regex=self.validation_regex,
message=mark_safe(_("Values must match this regex: <code>{regex}</code>").format(
regex=escape(self.validation_regex)
))
)
]
# JSON
elif self.type == CustomFieldTypeChoices.TYPE_JSON:
@@ -685,6 +693,13 @@ class CustomField(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
if self.validation_regex and not re.match(self.validation_regex, value):
raise ValidationError(_("Value must match regex '{regex}'").format(regex=self.validation_regex))
# Validate URL field
elif self.type == CustomFieldTypeChoices.TYPE_URL:
if type(value) is not str:
raise ValidationError(_("Value must be a string."))
if self.validation_regex and not re.match(self.validation_regex, value):
raise ValidationError(_("Value must match regex '{regex}'").format(regex=self.validation_regex))
# Validate integer
elif self.type == CustomFieldTypeChoices.TYPE_INTEGER:
if type(value) is not int:
@@ -780,8 +795,7 @@ class CustomFieldChoiceSet(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel
"""
name = models.CharField(
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
description = models.CharField(
max_length=200,

View File

@@ -59,8 +59,7 @@ class EventRule(CustomFieldsMixin, ExportTemplatesMixin, TagsMixin, ChangeLogged
name = models.CharField(
verbose_name=_('name'),
max_length=150,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
description = models.CharField(
verbose_name=_('description'),
@@ -165,8 +164,7 @@ class Webhook(CustomFieldsMixin, ExportTemplatesMixin, TagsMixin, ChangeLoggedMo
name = models.CharField(
verbose_name=_('name'),
max_length=150,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
description = models.CharField(
verbose_name=_('description'),
@@ -309,8 +307,7 @@ class CustomLink(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
enabled = models.BooleanField(
verbose_name=_('enabled'),
@@ -471,14 +468,12 @@ class SavedFilter(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
unique=True,
db_collation='case_insensitive',
unique=True
)
description = models.CharField(
verbose_name=_('description'),

View File

@@ -125,8 +125,7 @@ class NotificationGroup(ChangeLoggedModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
description = models.CharField(
verbose_name=_('description'),

View File

@@ -2,7 +2,7 @@ from django.conf import settings
from django.db import models
from django.urls import reverse
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _, pgettext_lazy
from django.utils.translation import gettext_lazy as _
from taggit.models import TagBase, GenericTaggedItemBase
from netbox.choices import ColorChoices
@@ -25,21 +25,6 @@ class Tag(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel, TagBase):
id = models.BigAutoField(
primary_key=True
)
# Override TagBase.name to set db_collation
name = models.CharField(
verbose_name=pgettext_lazy("A tag name", "name"),
unique=True,
max_length=100,
db_collation='ci_natural_sort',
)
# Override TagBase.slug to set db_collation
slug = models.SlugField(
verbose_name=pgettext_lazy("A tag slug", "slug"),
unique=True,
max_length=100,
allow_unicode=True,
db_collation='case_insensitive',
)
color = ColorField(
verbose_name=_('color'),
default=ColorChoices.COLOR_GREY

View File

@@ -1,9 +1,12 @@
import inspect
import json
import logging
import os
import re
import yaml
from django import forms
from django.conf import settings
from django.core.files.storage import storages
from django.core.validators import RegexValidator
from django.utils import timezone
@@ -487,7 +490,7 @@ class BaseScript:
if self.fieldsets:
fieldsets.extend(self.fieldsets)
else:
fields = list(name for name, __ in self._get_vars().items())
fields = list(name for name, _ in self._get_vars().items())
fieldsets.append((_('Script Data'), fields))
# Append the default fieldset if defined in the Meta class
@@ -579,6 +582,40 @@ class BaseScript:
self._log(message, obj, level=LogLevelChoices.LOG_FAILURE)
self.failed = True
#
# Convenience functions
#
def load_yaml(self, filename):
"""
Return data from a YAML file
"""
# TODO: DEPRECATED: Remove this method in v4.5
self._log(
_("load_yaml is deprecated and will be removed in v4.5"),
level=LogLevelChoices.LOG_WARNING
)
file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
with open(file_path, 'r') as datafile:
data = yaml.load(datafile, Loader=yaml.SafeLoader)
return data
def load_json(self, filename):
"""
Return data from a JSON file
"""
# TODO: DEPRECATED: Remove this method in v4.5
self._log(
_("load_json is deprecated and will be removed in v4.5"),
level=LogLevelChoices.LOG_WARNING
)
file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
with open(file_path, 'r') as datafile:
data = json.load(datafile)
return data
#
# Legacy Report functionality
#

View File

@@ -12,8 +12,7 @@ from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Rack, Loca
from extras.choices import *
from extras.models import *
from extras.scripts import BooleanVar, IntegerVar, Script as PythonClass, StringVar
from users.constants import TOKEN_PREFIX
from users.models import Group, Token, User
from users.models import Group, User
from utilities.testing import APITestCase, APIViewTestCases
@@ -856,61 +855,20 @@ class ConfigTemplateTest(APIViewTestCases.APIViewTestCase):
)
ConfigTemplate.objects.bulk_create(config_templates)
def test_render(self):
configtemplate = ConfigTemplate.objects.first()
self.add_permissions('extras.render_configtemplate', 'extras.view_configtemplate')
url = reverse('extras-api:configtemplate-render', kwargs={'pk': configtemplate.pk})
response = self.client.post(url, {'foo': 'bar'}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['content'], 'Foo: bar')
def test_render_without_permission(self):
configtemplate = ConfigTemplate.objects.first()
# No permissions added - user has no render permission
url = reverse('extras-api:configtemplate-render', kwargs={'pk': configtemplate.pk})
response = self.client.post(url, {'foo': 'bar'}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_404_NOT_FOUND)
def test_render_token_write_enabled(self):
configtemplate = ConfigTemplate.objects.first()
self.add_permissions('extras.render_configtemplate', 'extras.view_configtemplate')
url = reverse('extras-api:configtemplate-render', kwargs={'pk': configtemplate.pk})
# Request without token auth should fail with PermissionDenied
response = self.client.post(url, {'foo': 'bar'}, format='json')
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
# Create token with write_enabled=False
token = Token.objects.create(version=2, user=self.user, write_enabled=False)
token_header = f'Bearer {TOKEN_PREFIX}{token.key}.{token.token}'
# Request with write-disabled token should fail
response = self.client.post(url, {'foo': 'bar'}, format='json', HTTP_AUTHORIZATION=token_header)
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
# Enable write and retry
token.write_enabled = True
token.save()
response = self.client.post(url, {'foo': 'bar'}, format='json', HTTP_AUTHORIZATION=token_header)
self.assertHttpStatus(response, status.HTTP_200_OK)
class ScriptTest(APITestCase):
class TestScriptClass(PythonClass):
class Meta:
name = "Test script"
name = 'Test script'
commit = True
scheduling_enabled = True
var1 = StringVar()
var2 = IntegerVar()
var3 = BooleanVar()
def run(self, data, commit=True):
self.log_info(data['var1'])
self.log_success(data['var2'])
self.log_failure(data['var3'])
@@ -921,14 +879,16 @@ class ScriptTest(APITestCase):
def setUpTestData(cls):
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='/var/tmp/script.py'
file_path='script.py',
)
Script.objects.create(
script = Script.objects.create(
module=module,
name="Test script",
name='Test script',
is_executable=True,
)
cls.url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
@property
def python_class(self):
return self.TestScriptClass
@@ -941,7 +901,7 @@ class ScriptTest(APITestCase):
def test_get_script(self):
module = ScriptModule.objects.get(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='/var/tmp/script.py'
file_path='script.py',
)
script = module.scripts.all().first()
url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
@@ -952,6 +912,76 @@ class ScriptTest(APITestCase):
self.assertEqual(response.data['vars']['var2'], 'IntegerVar')
self.assertEqual(response.data['vars']['var3'], 'BooleanVar')
def test_schedule_script_past_time_rejected(self):
"""
Scheduling with past schedule_at should fail.
"""
self.add_permissions('extras.run_script')
payload = {
'data': {'var1': 'hello', 'var2': 1, 'var3': False},
'commit': True,
'schedule_at': now() - datetime.timedelta(hours=1),
}
response = self.client.post(self.url, payload, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('schedule_at', response.data)
# Be tolerant of exact wording but ensure we failed on schedule_at being in the past
self.assertIn('future', str(response.data['schedule_at']).lower())
def test_schedule_script_interval_only(self):
"""
Interval without schedule_at should auto-set schedule_at now.
"""
self.add_permissions('extras.run_script')
payload = {
'data': {'var1': 'hello', 'var2': 1, 'var3': False},
'commit': True,
'interval': 60,
}
response = self.client.post(self.url, payload, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# The latest job is returned in the script detail serializer under "result"
self.assertIn('result', response.data)
self.assertEqual(response.data['result']['interval'], 60)
# Ensure a start time was autopopulated
self.assertIsNotNone(response.data['result']['scheduled'])
def test_schedule_script_when_disabled(self):
"""
Scheduling should fail when script.scheduling_enabled=False.
"""
self.add_permissions('extras.run_script')
# Temporarily disable scheduling on the in-test Python class
original = getattr(self.TestScriptClass.Meta, 'scheduling_enabled', True)
self.TestScriptClass.Meta.scheduling_enabled = False
base = {
'data': {'var1': 'hello', 'var2': 1, 'var3': False},
'commit': True,
}
# Check both schedule_at and interval paths
cases = [
{**base, 'schedule_at': now() + datetime.timedelta(minutes=5)},
{**base, 'interval': 60},
]
try:
for case in cases:
with self.subTest(case=list(case.keys())):
response = self.client.post(self.url, case, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
# Error should be attached to whichever field we used
key = 'schedule_at' if 'schedule_at' in case else 'interval'
self.assertIn(key, response.data)
self.assertIn('scheduling is not enabled', str(response.data[key]).lower())
finally:
# Restore the original setting for other tests
self.TestScriptClass.Meta.scheduling_enabled = original
class CreatedUpdatedFilterTest(APITestCase):

View File

@@ -1300,6 +1300,28 @@ class CustomFieldAPITest(APITestCase):
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
def test_url_regex_validation(self):
"""
Test that validation_regex is applied to URL custom fields (fixes #20498).
"""
site2 = Site.objects.get(name='Site 2')
url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
self.add_permissions('dcim.change_site')
cf_url = CustomField.objects.get(name='url_field')
cf_url.validation_regex = r'^https://' # Require HTTPS
cf_url.save()
# Test invalid URL (http instead of https)
data = {'custom_fields': {'url_field': 'http://example.com'}}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
# Test valid URL (https)
data = {'custom_fields': {'url_field': 'https://example.com'}}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
def test_uniqueness_validation(self):
# Create a unique custom field
cf_text = CustomField.objects.get(name='text_field')

View File

@@ -363,7 +363,7 @@ class EventRuleTest(APITestCase):
body = json.loads(request.body)
self.assertEqual(body['event'], 'created')
self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
self.assertEqual(body['object_type'], 'dcim.site')
self.assertEqual(body['model'], 'site')
self.assertEqual(body['username'], 'testuser')
self.assertEqual(body['request_id'], str(request_id))
self.assertEqual(body['data']['name'], 'Site 1')

View File

@@ -1,3 +1,5 @@
import logging
import tempfile
from datetime import date, datetime, timezone
from decimal import Decimal
@@ -7,6 +9,7 @@ from netaddr import IPAddress, IPNetwork
from dcim.models import DeviceRole
from extras.scripts import *
from utilities.testing import disable_logging
CHOICES = (
('ff0000', 'Red'),
@@ -32,6 +35,35 @@ JSON_DATA = """
"""
class ScriptTest(TestCase):
def test_load_yaml(self):
datafile = tempfile.NamedTemporaryFile()
datafile.write(bytes(YAML_DATA, 'UTF-8'))
datafile.seek(0)
with disable_logging(level=logging.WARNING):
data = Script().load_yaml(datafile.name)
self.assertEqual(data, {
'Foo': 123,
'Bar': 456,
'Baz': ['A', 'B', 'C'],
})
def test_load_json(self):
datafile = tempfile.NamedTemporaryFile()
datafile.write(bytes(JSON_DATA, 'UTF-8'))
datafile.seek(0)
with disable_logging(level=logging.WARNING):
data = Script().load_json(datafile.name)
self.assertEqual(data, {
'Foo': 123,
'Bar': 456,
'Baz': ['A', 'B', 'C'],
})
class ScriptVariablesTest(TestCase):
def test_stringvar(self):

View File

@@ -52,6 +52,7 @@ def send_webhook(event_rule, object_type, event_type, data, timestamp, username,
'event': WEBHOOK_EVENT_TYPES.get(event_type, event_type),
'timestamp': timestamp,
'object_type': '.'.join(object_type.natural_key()),
'model': object_type.model,
'username': username,
'request_id': request.id if request else None,
'data': data,
@@ -99,7 +100,7 @@ def send_webhook(event_rule, object_type, event_type, data, timestamp, username,
'data': body.encode('utf8'),
}
logger.info(
f"Sending {params['method']} request to {params['url']} ({context['object_type']} {context['event']})"
f"Sending {params['method']} request to {params['url']} ({context['model']} {context['event']})"
)
logger.debug(params)
try:

View File

@@ -170,7 +170,7 @@ class IPAddressFilter(ContactFilterMixin, TenancyFilterMixin, PrimaryModelFilter
@strawberry_django.filter_field()
def assigned(self, value: bool, prefix) -> Q:
return Q(assigned_object_id__isnull=(not value))
return Q(**{f"{prefix}assigned_object_id__isnull": not value})
@strawberry_django.filter_field()
def parent(self, value: list[str], prefix) -> Q:

View File

@@ -1,100 +0,0 @@
from django.db import migrations, models
PATTERN_OPS_INDEXES = [
'ipam_asnrange_name_c7585e73_like',
'ipam_asnrange_slug_c8a7d8a1_like',
'ipam_rir_name_64a71982_like',
'ipam_rir_slug_ff1a369a_like',
'ipam_role_name_13784849_like',
'ipam_role_slug_309ca14c_like',
'ipam_routetarget_name_212be79f_like',
'ipam_servicetemplate_name_1a2f3410_like',
'ipam_vlangroup_slug_40abcf6b_like',
'ipam_vlantranslationpolicy_name_17e0a007_like',
'ipam_vrf_rd_0ac1bde1_like',
]
def remove_indexes(apps, schema_editor):
for idx in PATTERN_OPS_INDEXES:
schema_editor.execute(f'DROP INDEX IF EXISTS {idx}')
class Migration(migrations.Migration):
dependencies = [
('ipam', '0082_add_prefix_network_containment_indexes'),
('dcim', '0217_ci_collations'),
]
operations = [
migrations.RunPython(
code=remove_indexes,
reverse_code=migrations.RunPython.noop,
),
migrations.AlterField(
model_name='asnrange',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='asnrange',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='rir',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='rir',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='role',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='role',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100, unique=True),
),
migrations.AlterField(
model_name='routetarget',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=21, unique=True),
),
migrations.AlterField(
model_name='servicetemplate',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='vlan',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=64),
),
migrations.AlterField(
model_name='vlangroup',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100),
),
migrations.AlterField(
model_name='vlangroup',
name='slug',
field=models.SlugField(db_collation='case_insensitive', max_length=100),
),
migrations.AlterField(
model_name='vlantranslationpolicy',
name='name',
field=models.CharField(db_collation='ci_natural_sort', max_length=100, unique=True),
),
migrations.AlterField(
model_name='vrf',
name='rd',
field=models.CharField(blank=True, db_collation='case_insensitive', max_length=21, null=True, unique=True),
),
]

View File

@@ -18,7 +18,12 @@ class ASNRange(OrganizationalModel):
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
unique=True
)
rir = models.ForeignKey(
to='ipam.RIR',

View File

@@ -50,8 +50,7 @@ class ServiceTemplate(ServiceBase, PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
class Meta:

View File

@@ -37,12 +37,11 @@ class VLANGroup(OrganizationalModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
db_collation="natural_sort"
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
db_collation='case_insensitive',
max_length=100
)
scope_type = models.ForeignKey(
to='contenttypes.ContentType',
@@ -215,8 +214,7 @@ class VLAN(PrimaryModel):
)
name = models.CharField(
verbose_name=_('name'),
max_length=64,
db_collation='ci_natural_sort',
max_length=64
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
@@ -364,7 +362,6 @@ class VLANTranslationPolicy(PrimaryModel):
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
)
class Meta:

View File

@@ -19,12 +19,11 @@ class VRF(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='natural_sort',
db_collation="natural_sort"
)
rd = models.CharField(
max_length=VRF_RD_MAX_LENGTH,
unique=True,
db_collation='case_insensitive',
blank=True,
null=True,
verbose_name=_('route distinguisher'),
@@ -76,8 +75,8 @@ class RouteTarget(PrimaryModel):
verbose_name=_('name'),
max_length=VRF_RD_MAX_LENGTH, # Same format options as VRF RD (RFC 4360 section 4)
unique=True,
db_collation='ci_natural_sort',
help_text=_('Route target value (formatted in accordance with RFC 4360)'),
db_collation="natural_sort"
)
tenant = models.ForeignKey(
to='tenancy.Tenant',

View File

@@ -3,6 +3,7 @@ import django_tables2 as tables
from ipam.models import *
from netbox.tables import NetBoxTable, columns
from tenancy.tables import ContactsColumnMixin
__all__ = (
'ServiceTable',
@@ -35,7 +36,7 @@ class ServiceTemplateTable(NetBoxTable):
default_columns = ('pk', 'name', 'protocol', 'ports', 'description')
class ServiceTable(NetBoxTable):
class ServiceTable(ContactsColumnMixin, NetBoxTable):
name = tables.Column(
verbose_name=_('Name'),
linkify=True
@@ -60,7 +61,7 @@ class ServiceTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = Service
fields = (
'pk', 'id', 'name', 'parent', 'protocol', 'ports', 'ipaddresses', 'description', 'comments', 'tags',
'created', 'last_updated',
'pk', 'id', 'name', 'parent', 'protocol', 'ports', 'ipaddresses', 'description', 'contacts', 'comments',
'tags', 'created', 'last_updated',
)
default_columns = ('pk', 'name', 'parent', 'protocol', 'ports', 'description')

View File

@@ -2,90 +2,47 @@ import logging
from django.conf import settings
from django.utils import timezone
from drf_spectacular.extensions import OpenApiAuthenticationExtension
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework import authentication, exceptions
from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS
from netbox.config import get_config
from users.constants import TOKEN_PREFIX
from users.models import Token
from utilities.request import get_client_ip
V1_KEYWORD = 'Token'
V2_KEYWORD = 'Bearer'
class TokenAuthentication(BaseAuthentication):
class TokenAuthentication(authentication.TokenAuthentication):
"""
A custom authentication scheme which enforces Token expiration times and source IP restrictions.
"""
model = Token
def authenticate(self, request):
# Authorization header is not present; ignore
if not (auth := get_authorization_header(request).split()):
return
# Unrecognized header; ignore
if auth[0].lower() not in (V1_KEYWORD.lower().encode(), V2_KEYWORD.lower().encode()):
return
# Check for extraneous token content
if len(auth) != 2:
raise exceptions.AuthenticationFailed(
'Invalid authorization header: Must be in the form "Bearer <key>.<token>" or "Token <token>"'
)
# Extract the key (if v2) & token plaintext from the auth header
result = super().authenticate(request)
if result:
token = result[1]
# Enforce source IP restrictions (if any) set on the token
if token.allowed_ips:
client_ip = get_client_ip(request)
if client_ip is None:
raise exceptions.AuthenticationFailed(
"Client IP address could not be determined for validation. Check that the HTTP server is "
"correctly configured to pass the required header(s)."
)
if not token.validate_client_ip(client_ip):
raise exceptions.AuthenticationFailed(
f"Source IP {client_ip} is not permitted to authenticate using this token."
)
return result
def authenticate_credentials(self, key):
model = self.get_model()
try:
auth_value = auth[1].decode()
except UnicodeError:
raise exceptions.AuthenticationFailed("Invalid authorization header: Token contains invalid characters")
# Infer token version from presence or absence of prefix
version = 2 if auth_value.startswith(TOKEN_PREFIX) else 1
if version == 1:
key, plaintext = None, auth_value
else:
auth_value = auth_value.removeprefix(TOKEN_PREFIX)
try:
key, plaintext = auth_value.split('.', 1)
except ValueError:
raise exceptions.AuthenticationFailed(
"Invalid authorization header: Could not parse key from v2 token. Did you mean to use 'Token' "
"instead of 'Bearer'?"
)
# Look for a matching token in the database
try:
qs = Token.objects.prefetch_related('user')
if version == 1:
# Fetch v1 token by querying plaintext value directly
token = qs.get(version=version, plaintext=plaintext)
else:
# Fetch v2 token by key, then validate the plaintext
token = qs.get(version=version, key=key)
if not token.validate(plaintext):
# Key is valid but plaintext is not. Raise DoesNotExist to guard against key enumeration.
raise Token.DoesNotExist()
except Token.DoesNotExist:
raise exceptions.AuthenticationFailed(f"Invalid v{version} token")
# Enforce source IP restrictions (if any) set on the token
if token.allowed_ips:
client_ip = get_client_ip(request)
if client_ip is None:
raise exceptions.AuthenticationFailed(
"Client IP address could not be determined for validation. Check that the HTTP server is "
"correctly configured to pass the required header(s)."
)
if not token.validate_client_ip(client_ip):
raise exceptions.AuthenticationFailed(
f"Source IP {client_ip} is not permitted to authenticate using this token."
)
# Enforce the Token's expiration time, if one has been set.
if token.is_expired:
raise exceptions.AuthenticationFailed("Token expired")
token = model.objects.prefetch_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed("Invalid token")
# Update last used, but only once per minute at most. This reduces write load on the database
if not token.last_used or (timezone.now() - token.last_used).total_seconds() > 60:
@@ -97,8 +54,11 @@ class TokenAuthentication(BaseAuthentication):
else:
Token.objects.filter(pk=token.pk).update(last_used=timezone.now())
user = token.user
# Enforce the Token's expiration time, if one has been set.
if token.is_expired:
raise exceptions.AuthenticationFailed("Token expired")
user = token.user
# When LDAP authentication is active try to load user data from LDAP directory
if 'netbox.authentication.LDAPBackend' in settings.REMOTE_AUTH_BACKEND:
from netbox.authentication import LDAPBackend
@@ -164,20 +124,6 @@ class TokenPermissions(DjangoObjectPermissions):
return super().has_object_permission(request, view, obj)
class TokenWritePermission(BasePermission):
"""
Verify the token has write_enabled for unsafe methods, without requiring specific model permissions.
Used for custom actions that accept user data but don't map to standard CRUD operations.
"""
def has_permission(self, request, view):
if not isinstance(request.auth, Token):
raise exceptions.PermissionDenied(
"TokenWritePermission requires token authentication."
)
return bool(request.method in SAFE_METHODS or request.auth.write_enabled)
class IsAuthenticatedOrLoginNotRequired(BasePermission):
"""
Returns True if the user is authenticated or LOGIN_REQUIRED is False.
@@ -186,17 +132,3 @@ class IsAuthenticatedOrLoginNotRequired(BasePermission):
if not settings.LOGIN_REQUIRED:
return True
return request.user.is_authenticated
class TokenScheme(OpenApiAuthenticationExtension):
target_class = 'netbox.api.authentication.TokenAuthentication'
name = 'tokenAuth'
match_subclasses = True
def get_security_definition(self, auto_schema):
return {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization',
'description': '`Token <token>` (v1) or `Bearer <key>.<token>` (v2)',
}

View File

@@ -184,13 +184,14 @@ class RemoteUserBackend(_RemoteUserBackend):
else:
user.groups.clear()
logger.debug(f"Stripping user {user} from Groups")
# Evaluate superuser status
user.is_superuser = self._is_superuser(user)
logger.debug(f"User {user} is Superuser: {user.is_superuser}")
logger.debug(
f"User {user} should be Superuser: {self._is_superuser(user)}")
user.is_staff = self._is_staff(user)
logger.debug(f"User {user} is Staff: {user.is_staff}")
logger.debug(f"User {user} should be Staff: {self._is_staff(user)}")
user.save()
return user
@@ -250,8 +251,19 @@ class RemoteUserBackend(_RemoteUserBackend):
return bool(result)
def _is_staff(self, user):
# Retain for pre-v4.5 compatibility
return user.is_superuser
logger = logging.getLogger('netbox.auth.RemoteUserBackend')
staff_groups = settings.REMOTE_AUTH_STAFF_GROUPS
logger.debug(f"Superuser Groups: {staff_groups}")
staff_users = settings.REMOTE_AUTH_STAFF_USERS
logger.debug(f"Staff Users :{staff_users}")
user_groups = set()
for g in user.groups.all():
user_groups.add(g.name)
logger.debug(f"User {user.username} is in Groups:{user_groups}")
result = user.username in staff_users or (
set(user_groups) & set(staff_groups))
logger.debug(f"User {user.username} in Staff Users :{result}")
return bool(result)
def configure_user(self, request, user):
logger = logging.getLogger('netbox.auth.RemoteUserBackend')

View File

@@ -68,16 +68,6 @@ REDIS = {
# https://docs.djangoproject.com/en/stable/ref/settings/#std:setting-SECRET_KEY
SECRET_KEY = ''
# Define a mapping of cryptographic peppers to use when hashing API tokens. A minimum of one pepper is required to
# enable v2 API tokens (NetBox v4.5+). Define peppers as a mapping of numeric ID to pepper value, as shown below. Each
# pepper must be at least 50 characters in length.
#
# API_TOKEN_PEPPERS = {
# 1: "<random string>",
# 2: "<random string>",
# }
API_TOKEN_PEPPERS = {}
#########################
# #
@@ -91,6 +81,9 @@ ADMINS = [
# ('John Doe', 'jdoe@example.com'),
]
# Permit the retrieval of API tokens after their creation.
ALLOW_TOKEN_RETRIEVAL = False
# Enable any desired validators for local account passwords below. For a list of included validators, please see the
# Django documentation at https://docs.djangoproject.com/en/stable/topics/auth/passwords/#password-validation.
AUTH_PASSWORD_VALIDATORS = [

View File

@@ -43,9 +43,7 @@ SECRET_KEY = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
DEFAULT_PERMISSIONS = {}
API_TOKEN_PEPPERS = {
1: 'TEST-VALUE-DO-NOT-USE-TEST-VALUE-DO-NOT-USE-TEST-VALUE-DO-NOT-USE',
}
ALLOW_TOKEN_RETRIEVAL = True
LOGGING = {
'version': 1,

View File

@@ -28,6 +28,7 @@ def preferences(request):
user_preferences = request.user.config if request.user.is_authenticated else {}
return {
'preferences': user_preferences,
'htmx_navigation': user_preferences.get('ui.htmx_navigation', False) == 'true'
}

View File

@@ -1,7 +1,7 @@
import strawberry
from django.conf import settings
from strawberry_django.optimizer import DjangoOptimizerExtension
from strawberry.extensions import MaxAliasesLimiter
from strawberry.extensions import MaxAliasesLimiter # , SchemaExtension
from strawberry.schema.config import StrawberryConfig
from circuits.graphql.schema import CircuitsQuery
@@ -16,17 +16,9 @@ from virtualization.graphql.schema import VirtualizationQuery
from vpn.graphql.schema import VPNQuery
from wireless.graphql.schema import WirelessQuery
__all__ = (
'Query',
'QueryV1',
'QueryV2',
'schema_v1',
'schema_v2',
)
@strawberry.type
class QueryV1(
class Query(
UsersQuery,
CircuitsQuery,
CoreQuery,
@@ -39,44 +31,11 @@ class QueryV1(
WirelessQuery,
*registry['plugins']['graphql_schemas'], # Append plugin schemas
):
"""Query class for GraphQL API v1"""
pass
@strawberry.type
class QueryV2(
UsersQuery,
CircuitsQuery,
CoreQuery,
DCIMQuery,
ExtrasQuery,
IPAMQuery,
TenancyQuery,
VirtualizationQuery,
VPNQuery,
WirelessQuery,
*registry['plugins']['graphql_schemas'], # Append plugin schemas
):
"""Query class for GraphQL API v2"""
pass
# Expose a default Query class for the configured default GraphQL version
class Query(QueryV2 if settings.GRAPHQL_DEFAULT_VERSION == 2 else QueryV1):
pass
# Generate schemas for both versions of the GraphQL API
schema_v1 = strawberry.Schema(
query=QueryV1,
config=StrawberryConfig(auto_camel_case=False),
extensions=[
DjangoOptimizerExtension(prefetch_custom_queryset=True),
MaxAliasesLimiter(max_alias_count=settings.GRAPHQL_MAX_ALIASES),
]
)
schema_v2 = strawberry.Schema(
query=QueryV2,
schema = strawberry.Schema(
query=Query,
config=StrawberryConfig(auto_camel_case=False),
extensions=[
DjangoOptimizerExtension(prefetch_custom_queryset=True),

View File

@@ -1,16 +0,0 @@
from django.conf import settings
from netbox.graphql.schema import schema_v1, schema_v2
__all__ = (
'get_default_schema',
)
def get_default_schema():
"""
Returns the GraphQL schema corresponding to the value of the NETBOX_GRAPHQL_DEFAULT_SCHEMA setting.
"""
if settings.GRAPHQL_DEFAULT_VERSION == 2:
return schema_v2
return schema_v1

View File

@@ -50,15 +50,21 @@ class NetBoxFeatureSet(
# Base model classes
#
class BaseModel(models.Model):
class ChangeLoggedModel(ChangeLoggingMixin, CustomValidationMixin, EventRulesMixin, models.Model):
"""
A global base model for all NetBox objects.
This class provides some important overrides to Django's default functionality, such as
- Overriding the default manager to use RestrictedQuerySet
- Extending `clean()` to validate GenericForeignKey fields
Base model for ancillary models; provides limited functionality for models which don't
support NetBox's full feature set.
"""
objects = RestrictedQuerySet.as_manager()
class Meta:
abstract = True
class NetBoxModel(NetBoxFeatureSet, models.Model):
"""
Base model for most object types. Suitable for use by plugins.
"""
objects = RestrictedQuerySet.as_manager()
class Meta:
@@ -97,25 +103,6 @@ class BaseModel(models.Model):
setattr(self, field.name, obj)
class ChangeLoggedModel(ChangeLoggingMixin, CustomValidationMixin, EventRulesMixin, BaseModel):
"""
Base model for ancillary models; provides limited functionality for models which don't
support NetBox's full feature set.
"""
class Meta:
abstract = True
class NetBoxModel(NetBoxFeatureSet, BaseModel):
"""
Base model for most object types. Suitable for use by plugins.
"""
class Meta:
abstract = True
#
# NetBox internal base models
#
@@ -153,13 +140,11 @@ class NestedGroupModel(NetBoxFeatureSet, MPTTModel):
)
name = models.CharField(
verbose_name=_('name'),
max_length=100,
db_collation='ci_natural_sort',
max_length=100
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
db_collation='case_insensitive',
max_length=100
)
description = models.CharField(
verbose_name=_('description'),
@@ -192,7 +177,7 @@ class NestedGroupModel(NetBoxFeatureSet, MPTTModel):
})
class OrganizationalModel(NetBoxModel):
class OrganizationalModel(NetBoxFeatureSet, models.Model):
"""
Organizational models are those which are used solely to categorize and qualify other objects, and do not convey
any real information about the infrastructure being modeled (for example, functional device roles). Organizational
@@ -204,14 +189,12 @@ class OrganizationalModel(NetBoxModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True,
db_collation='ci_natural_sort',
unique=True
)
slug = models.SlugField(
verbose_name=_('slug'),
max_length=100,
unique=True,
db_collation='case_insensitive',
unique=True
)
description = models.CharField(
verbose_name=_('description'),
@@ -219,6 +202,8 @@ class OrganizationalModel(NetBoxModel):
blank=True
)
objects = RestrictedQuerySet.as_manager()
class Meta:
abstract = True
ordering = ('name',)

View File

@@ -3,12 +3,12 @@ from collections import OrderedDict
from django.apps import apps
from django.urls.exceptions import NoReverseMatch
from drf_spectacular.utils import extend_schema
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework.reverse import reverse
from rest_framework.views import APIView
from netbox.registry import registry
from utilities.api import IsSuperuser
@extend_schema(exclude=True)
@@ -16,7 +16,7 @@ class InstalledPluginsAPIView(APIView):
"""
API view for listing all installed plugins
"""
permission_classes = [IsSuperuser]
permission_classes = [permissions.IsAdminUser]
_ignore_model_permissions = True
schema = None

View File

@@ -26,6 +26,16 @@ def get_csv_delimiters():
PREFERENCES = {
# User interface
'ui.htmx_navigation': UserPreference(
label=_('HTMX Navigation'),
choices=(
('', _('Disabled')),
('true', _('Enabled')),
),
description=_('Enable dynamic UI navigation'),
default=False,
warning=_('Experimental feature')
),
'locale.language': UserPreference(
label=_('Language'),
choices=(

View File

@@ -20,7 +20,6 @@ from netbox.plugins import PluginConfig
from netbox.registry import registry
import storages.utils # type: ignore
from utilities.release import load_release_data
from utilities.security import validate_peppers
from utilities.string import trailing_slash
from .monkey import get_unique_validators
@@ -44,9 +43,9 @@ VERSION = RELEASE.full_version # Retained for backward compatibility
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Validate Python version
if sys.version_info < (3, 12):
if sys.version_info < (3, 10):
raise RuntimeError(
f"NetBox requires Python 3.12 or later. (Currently installed: Python {platform.python_version()})"
f"NetBox requires Python 3.10 or later. (Currently installed: Python {platform.python_version()})"
)
#
@@ -76,8 +75,8 @@ elif hasattr(configuration, 'DATABASE') and hasattr(configuration, 'DATABASES'):
# Set static config parameters
ADMINS = getattr(configuration, 'ADMINS', [])
ALLOW_TOKEN_RETRIEVAL = getattr(configuration, 'ALLOW_TOKEN_RETRIEVAL', False)
ALLOWED_HOSTS = getattr(configuration, 'ALLOWED_HOSTS') # Required
API_TOKEN_PEPPERS = getattr(configuration, 'API_TOKEN_PEPPERS', {})
AUTH_PASSWORD_VALIDATORS = getattr(configuration, 'AUTH_PASSWORD_VALIDATORS', [
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
@@ -137,7 +136,6 @@ EVENTS_PIPELINE = getattr(configuration, 'EVENTS_PIPELINE', [
EXEMPT_VIEW_PERMISSIONS = getattr(configuration, 'EXEMPT_VIEW_PERMISSIONS', [])
FIELD_CHOICES = getattr(configuration, 'FIELD_CHOICES', {})
FILE_UPLOAD_MAX_MEMORY_SIZE = getattr(configuration, 'FILE_UPLOAD_MAX_MEMORY_SIZE', 2621440)
GRAPHQL_DEFAULT_VERSION = getattr(configuration, 'GRAPHQL_DEFAULT_VERSION', 1)
GRAPHQL_MAX_ALIASES = getattr(configuration, 'GRAPHQL_MAX_ALIASES', 10)
HOSTNAME = getattr(configuration, 'HOSTNAME', platform.node())
HTTP_PROXIES = getattr(configuration, 'HTTP_PROXIES', {})
@@ -176,6 +174,8 @@ REMOTE_AUTH_SUPERUSERS = getattr(configuration, 'REMOTE_AUTH_SUPERUSERS', [])
REMOTE_AUTH_USER_EMAIL = getattr(configuration, 'REMOTE_AUTH_USER_EMAIL', 'HTTP_REMOTE_USER_EMAIL')
REMOTE_AUTH_USER_FIRST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_FIRST_NAME', 'HTTP_REMOTE_USER_FIRST_NAME')
REMOTE_AUTH_USER_LAST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_LAST_NAME', 'HTTP_REMOTE_USER_LAST_NAME')
REMOTE_AUTH_STAFF_GROUPS = getattr(configuration, 'REMOTE_AUTH_STAFF_GROUPS', [])
REMOTE_AUTH_STAFF_USERS = getattr(configuration, 'REMOTE_AUTH_STAFF_USERS', [])
# Required by extras/migrations/0109_script_models.py
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)
@@ -229,12 +229,6 @@ if len(SECRET_KEY) < 50:
f" python {BASE_DIR}/generate_secret_key.py"
)
# Validate API token peppers
if API_TOKEN_PEPPERS:
validate_peppers(API_TOKEN_PEPPERS)
else:
warnings.warn("API_TOKEN_PEPPERS is not defined. v2 API tokens cannot be used.")
# Validate update repo URL and timeout
if RELEASE_CHECK_URL:
try:

View File

@@ -270,7 +270,7 @@ class ActionsColumn(tables.Column):
if not (self.actions or self.extra_buttons):
return ''
# Skip dummy records (e.g. available VLANs or IP ranges replacing individual IPs)
if not isinstance(record, model) or not getattr(record, 'pk', None):
if type(record) is not model or not getattr(record, 'pk', None):
return ''
if request := getattr(table, 'context', {}).get('request'):

View File

@@ -8,7 +8,6 @@ from rest_framework.test import APIClient
from core.models import ObjectType
from dcim.models import Rack, Site
from users.constants import TOKEN_PREFIX
from users.models import Group, ObjectPermission, Token, User
from utilities.testing import TestCase
from utilities.testing.api import APITestCase
@@ -17,159 +16,67 @@ from utilities.testing.api import APITestCase
class TokenAuthenticationTestCase(APITestCase):
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_no_token(self):
def test_token_authentication(self):
url = reverse('dcim-api:site-list')
# Request without a token should return a 403
response = self.client.get(reverse('dcim-api:site-list'))
response = self.client.get(url)
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v1_token_valid(self):
# Create a v1 token
token = Token.objects.create(version=1, user=self.user)
# Valid token should return a 200
header = f'Token {token.token}'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 200, response.data)
token = Token.objects.create(user=self.user)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 200)
# Check that the token's last_used time has been updated
token.refresh_from_db()
self.assertIsNotNone(token.last_used)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v1_token_invalid(self):
# Invalid token should return a 403
header = 'Token XXXXXXXXXX'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.data['detail'], "Invalid v1 token")
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v2_token_valid(self):
# Create a v2 token
token = Token.objects.create(version=2, user=self.user)
# Valid token should return a 200
header = f'Bearer {TOKEN_PREFIX}{token.key}.{token.token}'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 200, response.data)
# Check that the token's last_used time has been updated
token.refresh_from_db()
self.assertIsNotNone(token.last_used)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v2_token_invalid(self):
# Invalid token should return a 403
header = f'Bearer {TOKEN_PREFIX}XXXXXX.XXXXXXXXXX'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.data['detail'], "Invalid v2 token")
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_expiration(self):
url = reverse('dcim-api:site-list')
# Create v1 & v2 tokens
future = datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc)
token1 = Token.objects.create(version=1, user=self.user, expires=future)
token2 = Token.objects.create(version=2, user=self.user, expires=future)
# Request with a non-expired token should succeed
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.token}')
self.assertEqual(response.status_code, 200)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}')
# Request without a non-expired token should succeed
token = Token.objects.create(user=self.user)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 200)
# Request with an expired token should fail
past = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
token1.expires = past
token1.save()
token2.expires = past
token2.save()
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.key}')
self.assertEqual(response.status_code, 403)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}')
token.expires = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
token.save()
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_write_enabled(self):
url = reverse('dcim-api:site-list')
data = [
{
'name': 'Site 1',
'slug': 'site-1',
},
{
'name': 'Site 2',
'slug': 'site-2',
},
]
self.add_permissions('dcim.view_site', 'dcim.add_site')
data = {
'name': 'Site 1',
'slug': 'site-1',
}
# Create v1 & v2 tokens
token1 = Token.objects.create(version=1, user=self.user, write_enabled=False)
token2 = Token.objects.create(version=2, user=self.user, write_enabled=False)
token1_header = f'Token {token1.token}'
token2_header = f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}'
# GET request with a write-disabled token should succeed
response = self.client.get(url, HTTP_AUTHORIZATION=token1_header)
self.assertEqual(response.status_code, 200)
response = self.client.get(url, HTTP_AUTHORIZATION=token2_header)
self.assertEqual(response.status_code, 200)
# POST request with a write-disabled token should fail
response = self.client.post(url, data[0], format='json', HTTP_AUTHORIZATION=token1_header)
self.assertEqual(response.status_code, 403)
response = self.client.post(url, data[1], format='json', HTTP_AUTHORIZATION=token2_header)
# Request with a write-disabled token should fail
token = Token.objects.create(user=self.user, write_enabled=False)
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 403)
# POST request with a write-enabled token should succeed
token1.write_enabled = True
token1.save()
token2.write_enabled = True
token2.save()
response = self.client.post(url, data[0], format='json', HTTP_AUTHORIZATION=token1_header)
self.assertEqual(response.status_code, 201)
response = self.client.post(url, data[1], format='json', HTTP_AUTHORIZATION=token2_header)
self.assertEqual(response.status_code, 201)
# Request with a write-enabled token should succeed
token.write_enabled = True
token.save()
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_allowed_ips(self):
url = reverse('dcim-api:site-list')
# Create v1 & v2 tokens
token1 = Token.objects.create(version=1, user=self.user, allowed_ips=['192.0.2.0/24'])
token2 = Token.objects.create(version=2, user=self.user, allowed_ips=['192.0.2.0/24'])
# Request from a non-allowed client IP should fail
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Token {token1.token}',
REMOTE_ADDR='127.0.0.1'
)
self.assertEqual(response.status_code, 403)
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}',
REMOTE_ADDR='127.0.0.1'
)
token = Token.objects.create(user=self.user, allowed_ips=['192.0.2.0/24'])
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='127.0.0.1')
self.assertEqual(response.status_code, 403)
# Request from an allowed client IP should succeed
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Token {token1.token}',
REMOTE_ADDR='192.0.2.1'
)
self.assertEqual(response.status_code, 200)
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}',
REMOTE_ADDR='192.0.2.1'
)
# Request with an expired token should fail
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='192.0.2.1')
self.assertEqual(response.status_code, 200)
@@ -520,7 +427,7 @@ class ObjectPermissionAPIViewTestCase(TestCase):
"""
self.user = User.objects.create(username='testuser')
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Bearer {TOKEN_PREFIX}{self.token.key}.{self.token.token}'}
self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)}
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object(self):

View File

@@ -6,8 +6,7 @@ from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, Spec
from account.views import LoginView, LogoutView
from netbox.api.views import APIRootView, StatusView
from netbox.graphql.schema import schema_v1, schema_v2
from netbox.graphql.utils import get_default_schema
from netbox.graphql.schema import schema
from netbox.graphql.views import NetBoxGraphQLView
from netbox.plugins.urls import plugin_patterns, plugin_api_patterns
from netbox.views import HomeView, MediaView, StaticMediaFailureView, SearchView, htmx
@@ -41,7 +40,7 @@ _patterns = [
# HTMX views
path('htmx/object-selector/', htmx.ObjectSelectorView.as_view(), name='htmx_object_selector'),
# REST API
# API
path('api/', APIRootView.as_view(), name='api-root'),
path('api/circuits/', include('circuits.api.urls')),
path('api/core/', include('core.api.urls')),
@@ -55,7 +54,6 @@ _patterns = [
path('api/wireless/', include('wireless.api.urls')),
path('api/status/', StatusView.as_view(), name='api-status'),
# REST API schema
path(
"api/schema/",
cache_page(timeout=86400, key_prefix=f"api_schema_{settings.RELEASE.version}")(
@@ -66,10 +64,8 @@ _patterns = [
path('api/schema/swagger-ui/', SpectacularSwaggerView.as_view(url_name='schema'), name='api_docs'),
path('api/schema/redoc/', SpectacularRedocView.as_view(url_name='schema'), name='api_redocs'),
# GraphQL API
path('graphql/', NetBoxGraphQLView.as_view(schema=get_default_schema()), name='graphql'),
path('graphql/v1/', NetBoxGraphQLView.as_view(schema=schema_v1), name='graphql_v1'),
path('graphql/v2/', NetBoxGraphQLView.as_view(schema=schema_v2), name='graphql_v2'),
# GraphQL
path('graphql/', NetBoxGraphQLView.as_view(schema=schema), name='graphql'),
# Serving static media in Django to pipe it through LoginRequiredMiddleware
path('media/<path:path>', MediaView.as_view(), name='media'),

View File

@@ -47,9 +47,9 @@ class HomeView(ConditionalLoginRequiredMixin, View):
))
dashboard = get_default_dashboard(config=DEFAULT_DASHBOARD).get_layout()
# Check whether a new release is available. (Only for superusers.)
# Check whether a new release is available. (Only for staff/superusers.)
new_release = None
if request.user.is_superuser:
if request.user.is_staff or request.user.is_superuser:
latest_release = cache.get('latest_release')
if latest_release:
release_version, release_url = latest_release

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Some files were not shown because too many files have changed in this diff Show More