diff --git a/TODO b/TODO index 5218b1db..7ee9e64b 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,6 @@ - TODO: ACL Form Bubble/ICON Extended/Standard -- TODO: Add an Access List to an Interface Custom Fields after comments - DONE - TODO: ACL rules, look at last number and increment to next 10 - TODO: Clone for ACL Interface should include device - TODO: Inconsistent errors for add/edit (where model is using a generic page) - TODO: Check Constants across codebase for consistency. -- TODO: Test API, Forms, & Models - https://github.com/k01ek/netbox-bgp/tree/main/netbox_bgp/tests , https://github.com/DanSheps/netbox-secretstore/tree/develop/netbox_secretstore/tests & https://github.com/FlxPeters/netbox-plugin-prometheus-sd/tree/main/netbox_prometheus_sd/tests +- TODO: Test API, & Forms - https://github.com/k01ek/netbox-bgp/tree/main/netbox_bgp/tests , https://github.com/DanSheps/netbox-secretstore/tree/develop/netbox_secretstore/tests & https://github.com/FlxPeters/netbox-plugin-prometheus-sd/tree/main/netbox_prometheus_sd/tests diff --git a/netbox_acls/forms/models.py b/netbox_acls/forms/models.py index 454c8782..42450a70 100644 --- a/netbox_acls/forms/models.py +++ b/netbox_acls/forms/models.py @@ -43,13 +43,6 @@ # Sets a standard help_text value to be used by the various classes for acl index help_text_acl_rule_index = "Determines the order of the rule in the ACL processing. AKA Sequence Number." -# Sets a standard error message for ACL rules with an action of remark, but no remark set. -error_message_no_remark = "Action is set to remark, you MUST add a remark." -# Sets a standard error message for ACL rules with an action of remark, but no source_prefix is set. -error_message_action_remark_source_prefix_set = "Action is set to remark, Source Prefix CANNOT be set." -# Sets a standard error message for ACL rules with an action not set to remark, but no remark is set. -error_message_remark_without_action_remark = "CANNOT set remark unless action is set to remark." - class AccessListForm(NetBoxModelForm): """ @@ -545,35 +538,6 @@ class Meta: ), } - def clean(self): - """ - Validates form inputs before submitting: - - Check if action set to remark, but no remark set. - - Check if action set to remark, but source_prefix set. - - Check remark set, but action not set to remark. - """ - super().clean() - cleaned_data = self.cleaned_data - error_message = {} - - action = cleaned_data.get("action") - remark = cleaned_data.get("remark") - source_prefix = cleaned_data.get("source_prefix") - - if action == "remark": - # Check if action set to remark, but no remark set. - if not remark: - error_message["remark"] = [error_message_no_remark] - # Check if action set to remark, but source_prefix set. - if source_prefix: - error_message["source_prefix"] = [error_message_action_remark_source_prefix_set] - # Check remark set, but action not set to remark. - elif remark: - error_message["remark"] = [error_message_remark_without_action_remark] - - if error_message: - raise ValidationError(error_message) - class ACLExtendedRuleForm(NetBoxModelForm): """ @@ -651,45 +615,3 @@ class Meta: ), "source_ports": help_text_acl_rule_logic, } - - def clean(self): - """ - Validates form inputs before submitting: - - Check if action set to remark, but no remark set. - - Check if action set to remark, but source_prefix set. - - Check if action set to remark, but source_ports set. - - Check if action set to remark, but destination_prefix set. - - Check if action set to remark, but destination_ports set. - - Check if action set to remark, but protocol set. - - Check remark set, but action not set to remark. - """ - super().clean() - cleaned_data = self.cleaned_data - error_message = {} - - action = cleaned_data.get("action") - remark = cleaned_data.get("remark") - source_prefix = cleaned_data.get("source_prefix") - source_ports = cleaned_data.get("source_ports") - destination_prefix = cleaned_data.get("destination_prefix") - destination_ports = cleaned_data.get("destination_ports") - protocol = cleaned_data.get("protocol") - - if action == "remark": - if not remark: - error_message["remark"] = [error_message_no_remark] - if source_prefix: - error_message["source_prefix"] = [error_message_action_remark_source_prefix_set] - if source_ports: - error_message["source_ports"] = ["Action is set to remark, Source Ports CANNOT be set."] - if destination_prefix: - error_message["destination_prefix"] = ["Action is set to remark, Destination Prefix CANNOT be set."] - if destination_ports: - error_message["destination_ports"] = ["Action is set to remark, Destination Ports CANNOT be set."] - if protocol: - error_message["protocol"] = ["Action is set to remark, Protocol CANNOT be set."] - elif remark: - error_message["remark"] = [error_message_remark_without_action_remark] - - if error_message: - raise ValidationError(error_message) diff --git a/netbox_acls/migrations/0005_alter_accesslist_options.py b/netbox_acls/migrations/0005_alter_accesslist_options.py new file mode 100644 index 00000000..de5c1d4b --- /dev/null +++ b/netbox_acls/migrations/0005_alter_accesslist_options.py @@ -0,0 +1,62 @@ +# Generated by Django 5.1.8 on 2025-04-20 22:35 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("netbox_acls", "0004_netbox_acls"), + ] + + operations = [ + migrations.AlterModelOptions( + name="accesslist", + options={"ordering": ("assigned_object_type", "assigned_object_id", "name")}, + ), + migrations.AlterModelOptions( + name="aclextendedrule", + options={"ordering": ("access_list", "index")}, + ), + migrations.AlterModelOptions( + name="aclinterfaceassignment", + options={"ordering": ("assigned_object_type", "assigned_object_id", "access_list", "direction")}, + ), + migrations.AlterModelOptions( + name="aclstandardrule", + options={"ordering": ("access_list", "index")}, + ), + migrations.AlterField( + model_name="accesslist", + name="assigned_object_type", + field=models.ForeignKey( + limit_choices_to=models.Q( + models.Q( + models.Q(("app_label", "dcim"), ("model", "device")), + models.Q(("app_label", "dcim"), ("model", "virtualchassis")), + models.Q(("app_label", "virtualization"), ("model", "virtualmachine")), + _connector="OR", + ), + ), + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="contenttypes.contenttype", + ), + ), + migrations.AlterField( + model_name="aclinterfaceassignment", + name="assigned_object_type", + field=models.ForeignKey( + limit_choices_to=models.Q( + models.Q( + models.Q(("app_label", "dcim"), ("model", "interface")), + models.Q(("app_label", "virtualization"), ("model", "vminterface")), + _connector="OR", + ), + ), + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="contenttypes.contenttype", + ), + ), + ] diff --git a/netbox_acls/models/access_list_rules.py b/netbox_acls/models/access_list_rules.py index 56567204..06275102 100644 --- a/netbox_acls/models/access_list_rules.py +++ b/netbox_acls/models/access_list_rules.py @@ -2,10 +2,11 @@ Define the django models for this plugin. """ -from django.apps import apps from django.contrib.postgres.fields import ArrayField +from django.core.exceptions import ValidationError from django.db import models from django.urls import reverse +from django.utils.translation import gettext_lazy as _ from netbox.models import NetBoxModel from ..choices import ACLProtocolChoices, ACLRuleActionChoices, ACLTypeChoices @@ -17,52 +18,69 @@ "ACLExtendedRule", ) +# Error message when the action is 'remark', but no remark is provided. +ERROR_MESSAGE_NO_REMARK = _("When the action is 'remark', a remark is required.") + +# Error message when the action is 'remark', but the source_prefix is set. +ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET = _("When the action is 'remark', the Source Prefix must not be set.") + +# Error message when the action is 'remark', but the source_ports are set. +ERROR_MESSAGE_ACTION_REMARK_SOURCE_PORTS_SET = _("When the action is 'remark', Source Ports must not be set.") + +# Error message when the action is 'remark', but the destination_prefix is set. +ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PREFIX_SET = _( + "When the action is 'remark', the Destination Prefix must not be set." +) + +# Error message when the action is 'remark', but the destination_ports are set. +ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PORTS_SET = _("When the action is 'remark', Destination Ports must not be set.") + +# Error message when the action is 'remark', but the protocol is set. +ERROR_MESSAGE_ACTION_REMARK_PROTOCOL_SET = _("When the action is 'remark', Protocol must not be set.") + +# Error message when a remark is provided, but the action is not set to 'remark'. +ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK = _("A remark cannot be set unless the action is 'remark'.") + class ACLRule(NetBoxModel): """ Abstract model for ACL Rules. - Inherrited by both ACLStandardRule and ACLExtendedRule. + Inherited by both ACLStandardRule and ACLExtendedRule. """ access_list = models.ForeignKey( - on_delete=models.CASCADE, to=AccessList, - verbose_name="Access List", + on_delete=models.CASCADE, related_name="rules", + verbose_name=_("Access List"), ) index = models.PositiveIntegerField() remark = models.CharField( + verbose_name=_("Remark"), max_length=500, blank=True, ) description = models.CharField( + verbose_name=_("Description"), max_length=500, blank=True, ) action = models.CharField( - choices=ACLRuleActionChoices, + verbose_name=_("Action"), max_length=30, + choices=ACLRuleActionChoices, ) source_prefix = models.ForeignKey( - blank=True, - null=True, + to="ipam.prefix", on_delete=models.PROTECT, related_name="+", - to="ipam.Prefix", - verbose_name="Source Prefix", + verbose_name=_("Source Prefix"), + blank=True, + null=True, ) clone_fields = ("access_list", "action", "source_prefix") - - def __str__(self): - return f"{self.access_list}: Rule {self.index}" - - def get_action_color(self): - return ACLRuleActionChoices.colors.get(self.action) - - @classmethod - def get_prerequisite_models(cls): - return [apps.get_model("ipam.Prefix"), AccessList] + prerequisite_models = ("netbox_acls.AccessList",) class Meta: """ @@ -73,8 +91,24 @@ class Meta: """ abstract = True - ordering = ["access_list", "index"] - unique_together = ["access_list", "index"] + ordering = ("access_list", "index") + unique_together = ("access_list", "index") + + def __str__(self): + return f"{self.access_list}: Rule {self.index}" + + def get_absolute_url(self): + """ + The method is a Django convention; although not strictly required, + it conveniently returns the absolute URL for any particular object. + """ + return reverse( + f"plugins:{self._meta.app_label}:{self._meta.model_name}", + args=[self.pk], + ) + + def get_action_color(self): + return ACLRuleActionChoices.colors.get(self.action) class ACLStandardRule(ACLRule): @@ -83,24 +117,13 @@ class ACLStandardRule(ACLRule): """ access_list = models.ForeignKey( - on_delete=models.CASCADE, to=AccessList, - verbose_name="Standard Access List", - limit_choices_to={"type": ACLTypeChoices.TYPE_STANDARD}, + on_delete=models.CASCADE, related_name="aclstandardrules", + limit_choices_to={"type": ACLTypeChoices.TYPE_STANDARD}, + verbose_name=_("Standard Access List"), ) - def get_absolute_url(self): - """ - The method is a Django convention; although not strictly required, - it conveniently returns the absolute URL for any particular object. - """ - return reverse("plugins:netbox_acls:aclstandardrule", args=[self.pk]) - - @classmethod - def get_prerequisite_models(cls): - return [AccessList] - class Meta(ACLRule.Meta): """ Define the model properties adding to or overriding the inherited class: @@ -109,62 +132,85 @@ class Meta(ACLRule.Meta): - verbose name plural (for displaying in the GUI) """ - verbose_name = "ACL Standard Rule" - verbose_name_plural = "ACL Standard Rules" + verbose_name = _("ACL Standard Rule") + verbose_name_plural = _("ACL Standard Rules") + + def clean(self): + """ + Validate the ACL Standard Rule inputs. + + If the action is 'remark', then the remark field must be provided (non-empty), + and the source_prefix field must be empty. + Conversely, if the remark field is provided, the action must be set to 'remark'. + """ + + super().clean() + errors = {} + + # Validate that only the remark field is filled + if self.action == ACLRuleActionChoices.ACTION_REMARK: + if not self.remark: + errors["remark"] = ERROR_MESSAGE_NO_REMARK + if self.source_prefix: + errors["source_prefix"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET + # Validate that the action is "remark", when the remark field is provided + elif self.remark: + errors["remark"] = ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK + + if errors: + raise ValidationError(errors) class ACLExtendedRule(ACLRule): """ Inherits ACLRule. - Add ACLExtendedRule specific fields: source_ports, desintation_prefix, destination_ports, and protocol + Add ACLExtendedRule specific fields: source_ports, destination_prefix, destination_ports, and protocol """ access_list = models.ForeignKey( - on_delete=models.CASCADE, to=AccessList, - verbose_name="Extended Access List", - limit_choices_to={"type": "extended"}, + on_delete=models.CASCADE, related_name="aclextendedrules", + limit_choices_to={"type": "extended"}, + verbose_name=_("Extended Access List"), ) source_ports = ArrayField( base_field=models.PositiveIntegerField(), + verbose_name=_("Source Ports"), blank=True, null=True, - verbose_name="Soure Ports", ) destination_prefix = models.ForeignKey( - blank=True, - null=True, + to="ipam.prefix", on_delete=models.PROTECT, related_name="+", - to="ipam.Prefix", - verbose_name="Destination Prefix", + verbose_name=_("Destination Prefix"), + blank=True, + null=True, ) destination_ports = ArrayField( base_field=models.PositiveIntegerField(), + verbose_name=_("Destination Ports"), blank=True, null=True, - verbose_name="Destination Ports", ) protocol = models.CharField( - blank=True, - choices=ACLProtocolChoices, + verbose_name=_("Protocol"), max_length=30, + choices=ACLProtocolChoices, + blank=True, ) - def get_absolute_url(self): - """ - The method is a Django convention; although not strictly required, - it conveniently returns the absolute URL for any particular object. - """ - return reverse("plugins:netbox_acls:aclextendedrule", args=[self.pk]) - - def get_protocol_color(self): - return ACLProtocolChoices.colors.get(self.protocol) - - @classmethod - def get_prerequisite_models(cls): - return [apps.get_model("ipam.Prefix"), AccessList] + clone_fields = ( + "access_list", + "action", + "source_prefix", + "source_ports", + "destination_prefix", + "destination_ports", + "protocol", + ) + prerequisite_models = ("netbox_acls.AccessList",) class Meta(ACLRule.Meta): """ @@ -174,5 +220,46 @@ class Meta(ACLRule.Meta): - verbose name plural (for displaying in the GUI) """ - verbose_name = "ACL Extended Rule" - verbose_name_plural = "ACL Extended Rules" + verbose_name = _("ACL Extended Rule") + verbose_name_plural = _("ACL Extended Rules") + + def clean(self): + """ + Validate the ACL Extended Rule inputs. + + When the action is 'remark', the remark field must be provided (non-empty), + and the following fields must be empty: + - source_prefix + - source_ports + - destination_prefix + - destination_ports + - protocol + + Conversely, if a remark is provided, the action must be set to 'remark'. + """ + super().clean() + errors = {} + + # Validate that only the remark field is filled + if self.action == ACLRuleActionChoices.ACTION_REMARK: + if not self.remark: + errors["remark"] = ERROR_MESSAGE_NO_REMARK + if self.source_prefix: + errors["source_prefix"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET + if self.source_ports: + errors["source_ports"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PORTS_SET + if self.destination_prefix: + errors["destination_prefix"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PREFIX_SET + if self.destination_ports: + errors["destination_ports"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PORTS_SET + if self.protocol: + errors["protocol"] = ERROR_MESSAGE_ACTION_REMARK_PROTOCOL_SET + # Validate that the action is "remark", when the remark field is provided + elif self.remark: + errors["remark"] = ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK + + if errors: + raise ValidationError(errors) + + def get_protocol_color(self): + return ACLProtocolChoices.colors.get(self.protocol) diff --git a/netbox_acls/models/access_lists.py b/netbox_acls/models/access_lists.py index 39a55f4a..b0af0d3d 100644 --- a/netbox_acls/models/access_lists.py +++ b/netbox_acls/models/access_lists.py @@ -4,10 +4,11 @@ from dcim.models import Device, Interface, VirtualChassis from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation -from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from django.core.validators import RegexValidator from django.db import models from django.urls import reverse +from django.utils.translation import gettext_lazy as _ from netbox.models import NetBoxModel from virtualization.models import VirtualMachine, VMInterface @@ -22,7 +23,7 @@ alphanumeric_plus = RegexValidator( r"^[a-zA-Z0-9-_]+$", - "Only alphanumeric, hyphens, and underscores characters are allowed.", + _("Only alphanumeric, hyphens, and underscores characters are allowed."), ) @@ -32,13 +33,16 @@ class AccessList(NetBoxModel): """ name = models.CharField( + verbose_name=_("Name"), max_length=500, validators=[alphanumeric_plus], ) assigned_object_type = models.ForeignKey( - to=ContentType, - limit_choices_to=ACL_HOST_ASSIGNMENT_MODELS, + to="contenttypes.ContentType", on_delete=models.PROTECT, + related_name="+", + limit_choices_to=ACL_HOST_ASSIGNMENT_MODELS, + verbose_name=_("Assigned Object Type"), ) assigned_object_id = models.PositiveBigIntegerField() assigned_object = GenericForeignKey( @@ -46,29 +50,30 @@ class AccessList(NetBoxModel): fk_field="assigned_object_id", ) type = models.CharField( + verbose_name=_("Type"), max_length=30, choices=ACLTypeChoices, ) default_action = models.CharField( - default=ACLActionChoices.ACTION_DENY, + verbose_name=_("Default Action"), max_length=30, + default=ACLActionChoices.ACTION_DENY, choices=ACLActionChoices, - verbose_name="Default Action", ) comments = models.TextField( blank=True, ) clone_fields = ( - "type", "default_action", + "type", ) class Meta: - unique_together = ["assigned_object_type", "assigned_object_id", "name"] - ordering = ["assigned_object_type", "assigned_object_id", "name"] - verbose_name = "Access List" - verbose_name_plural = "Access Lists" + unique_together = ("assigned_object_type", "assigned_object_id", "name") + ordering = ("assigned_object_type", "assigned_object_id", "name") + verbose_name = _("Access List") + verbose_name_plural = _("Access Lists") def __str__(self): return self.name @@ -95,18 +100,21 @@ class ACLInterfaceAssignment(NetBoxModel): """ access_list = models.ForeignKey( - on_delete=models.CASCADE, to=AccessList, - verbose_name="Access List", + on_delete=models.CASCADE, + verbose_name=_("Access List"), ) direction = models.CharField( + verbose_name=_("Direction"), max_length=30, choices=ACLAssignmentDirectionChoices, ) assigned_object_type = models.ForeignKey( - to=ContentType, - limit_choices_to=ACL_INTERFACE_ASSIGNMENT_MODELS, + to="contenttypes.ContentType", on_delete=models.PROTECT, + related_name="+", + limit_choices_to=ACL_INTERFACE_ASSIGNMENT_MODELS, + verbose_name=_("Assigned Object Type"), ) assigned_object_id = models.PositiveBigIntegerField() assigned_object = GenericForeignKey( @@ -121,20 +129,20 @@ class ACLInterfaceAssignment(NetBoxModel): prerequisite_models = ("netbox_acls.AccessList",) class Meta: - unique_together = [ + unique_together = ( "assigned_object_type", "assigned_object_id", "access_list", "direction", - ] - ordering = [ + ) + ordering = ( "assigned_object_type", "assigned_object_id", "access_list", "direction", - ] - verbose_name = "ACL Interface Assignment" - verbose_name_plural = "ACL Interface Assignments" + ) + verbose_name = _("ACL Interface Assignment") + verbose_name_plural = _("ACL Interface Assignments") def __str__(self): return f"{self.access_list}: Interface {self.assigned_object}" @@ -149,6 +157,20 @@ def get_absolute_url(self): args=[self.pk], ) + def save(self, *args, **kwargs): + """Saves the current instance to the database.""" + # Ensure the assigned interface's host matches the host assigned to the access list. + if self.assigned_object.parent_object != self.access_list.assigned_object: + raise ValidationError( + { + "assigned_object": _( + "The assigned interface must belong to the same device or virtual machine as the access list." + ) + }, + ) + + super().save(*args, **kwargs) + def get_direction_color(self): return ACLAssignmentDirectionChoices.colors.get(self.direction) diff --git a/netbox_acls/tests/models/__init__.py b/netbox_acls/tests/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/netbox_acls/tests/models/base.py b/netbox_acls/tests/models/base.py new file mode 100644 index 00000000..55d5d0b5 --- /dev/null +++ b/netbox_acls/tests/models/base.py @@ -0,0 +1,117 @@ +from dcim.models import ( + Device, + DeviceRole, + DeviceType, + Manufacturer, + Site, + VirtualChassis, +) +from django.test import TestCase +from ipam.models import Prefix +from virtualization.models import Cluster, ClusterType, VirtualMachine + + +class BaseTestCase(TestCase): + """ + Base test case for netbox_acls models. + """ + + @classmethod + def setUpTestData(cls): + """ + Create base data to test using including + - 1 of each of the following: test site, manufacturer, device type + device role, cluster type, cluster, virtual chassis, and + virtual machine + - 2 of each Device, prefix + """ + + # Sites + site = Site.objects.create( + name="Site 1", + slug="site-1", + ) + + # Device Types + manufacturer = Manufacturer.objects.create( + name="Manufacturer 1", + slug="manufacturer-1", + ) + device_type = DeviceType.objects.create( + manufacturer=manufacturer, + model="Device Type 1", + ) + + # Device Roles + device_role = DeviceRole.objects.create( + name="Device Role 1", + slug="device-role-1", + ) + + # Devices + cls.device1 = Device.objects.create( + name="Device 1", + site=site, + device_type=device_type, + role=device_role, + ) + cls.device2 = Device.objects.create( + name="Device 2", + site=site, + device_type=device_type, + role=device_role, + ) + + # Virtual Chassis + cls.virtual_chassis1 = VirtualChassis.objects.create( + name="Virtual Chassis 1", + ) + + # Virtual Chassis Members + cls.virtual_chassis_member1 = Device.objects.create( + name="VC Device", + site=site, + device_type=device_type, + role=device_role, + virtual_chassis=cls.virtual_chassis1, + vc_position=1, + ) + + # Virtualization Cluster Type + cluster_type = ClusterType.objects.create( + name="Cluster Type 1", + ) + + # Virtualization Cluster + cluster = Cluster.objects.create( + name="Cluster 1", + type=cluster_type, + ) + + # Virtualization Cluster Member + cls.cluster_member1 = Device.objects.create( + name="Cluster Device", + site=site, + device_type=device_type, + role=device_role, + ) + + # Virtual Machine + cls.virtual_machine1 = VirtualMachine.objects.create( + name="VirtualMachine 1", + status="active", + cluster=cluster, + ) + cls.virtual_machine2 = VirtualMachine.objects.create( + name="VirtualMachine 2", + status="active", + cluster=cluster, + ) + + # Prefix + cls.prefix1 = Prefix.objects.create( + prefix="10.1.0.0/16", + ) + cls.prefix2 = Prefix.objects.create( + prefix="10.2.0.0/16", + ) diff --git a/netbox_acls/tests/models/test_accesslists.py b/netbox_acls/tests/models/test_accesslists.py new file mode 100644 index 00000000..579bd719 --- /dev/null +++ b/netbox_acls/tests/models/test_accesslists.py @@ -0,0 +1,269 @@ +from itertools import cycle + +from dcim.models import Device, VirtualChassis +from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError +from ipam.models import Prefix +from virtualization.models import VirtualMachine + +from netbox_acls.models import AccessList + +from .base import BaseTestCase + + +class TestAccessList(BaseTestCase): + """ + Test AccessList model. + """ + + common_acl_params = { + "type": "extended", + "default_action": "permit", + } + + def test_accesslist_standard_creation(self): + """ + Test that AccessList Standard creation passes validation. + """ + acl_name = "Test-ACL-Standard-Type" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.device1, + type="standard", + default_action="deny", + ) + + self.assertTrue(isinstance(created_acl, AccessList), True) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "standard") + self.assertEqual(created_acl.default_action, "deny") + self.assertEqual(isinstance(created_acl.assigned_object, Device), True) + self.assertEqual(created_acl.assigned_object, self.device1) + + def test_accesslist_extended_creation(self): + """ + Test that AccessList Extended creation passes validation. + """ + acl_name = "Test-ACL-Extended-Type" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.device2, + type="extended", + default_action="permit", + ) + + self.assertTrue(isinstance(created_acl, AccessList)) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "extended") + self.assertEqual(created_acl.default_action, "permit") + self.assertEqual(isinstance(created_acl.assigned_object, Device), True) + self.assertEqual(created_acl.assigned_object, self.device2) + + def test_accesslist_creation_with_virtual_chassis(self): + """ + Test that AccessList creation with an assigned virtual chassis passes validation. + """ + acl_name = "Test-ACL-with-Virtual-Machine" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.virtual_chassis1, + **self.common_acl_params, + ) + + self.assertTrue(isinstance(created_acl, AccessList)) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "extended") + self.assertEqual(created_acl.default_action, "permit") + self.assertEqual(isinstance(created_acl.assigned_object, VirtualChassis), True) + self.assertEqual(created_acl.assigned_object, self.virtual_chassis1) + + def test_accesslist_creation_with_virtual_machine(self): + """ + Test that AccessList creation with an assigned virtual machine passes validation. + """ + acl_name = "Test-ACL-with-Virtual-Machine" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.virtual_machine1, + **self.common_acl_params, + ) + + self.assertTrue(isinstance(created_acl, AccessList)) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "extended") + self.assertEqual(created_acl.default_action, "permit") + self.assertEqual(isinstance(created_acl.assigned_object, VirtualMachine), True) + self.assertEqual(created_acl.assigned_object, self.virtual_machine1) + + def test_wrong_assigned_object_type_fail(self): + """ + Test that AccessList cannot be assigned to an object type other than Device, VirtualChassis, VirtualMachine, + or Cluster. + """ + acl_bad_gfk = AccessList( + name="TestACL_Wrong_GFK", + assigned_object_type=ContentType.objects.get_for_model(Prefix), + assigned_object_id=self.prefix1.id, + **self.common_acl_params, + ) + with self.assertRaises(ValidationError): + acl_bad_gfk.full_clean() + + def test_alphanumeric_plus_success(self): + """ + Test that AccessList names with alphanumeric characters, '_', or '-' pass validation. + """ + acl_good_name = AccessList( + name="Test-ACL-Good_Name-1", + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=self.device1.id, + **self.common_acl_params, + ) + acl_good_name.full_clean() + + def test_duplicate_name_success(self): + """ + Test that AccessList names can be non-unique if associated with different devices. + """ + # Device + device_acl = AccessList( + name="GOOD-DUPLICATE-ACL", + assigned_object=self.device1, + **self.common_acl_params, + ) + device_acl.full_clean() + + # Virtual Chassis + vc_acl = AccessList( + name="GOOD-DUPLICATE-ACL", + assigned_object=self.virtual_chassis1, + **self.common_acl_params, + ) + vc_acl.full_clean() + + # Virtual Machine + vm_acl = AccessList( + name="GOOD-DUPLICATE-ACL", + assigned_object=self.virtual_machine1, + **self.common_acl_params, + ) + vm_acl.full_clean() + + def test_alphanumeric_plus_fail(self): + """ + Test that AccessList names with non-alphanumeric (excluding '_' and '-') characters fail validation. + """ + non_alphanumeric_plus_chars = " !@#$%^&*()[]{};:,./<>?\|~=+" + + for i, char in enumerate(non_alphanumeric_plus_chars, start=1): + bad_acl_name = AccessList( + name=f"Test-ACL-bad_name_{i}_{char}", + assigned_object=self.device1, + comments=f'ACL with "{char}" in name', + **self.common_acl_params, + ) + with self.assertRaises(ValidationError): + bad_acl_name.full_clean() + + def test_duplicate_name_per_device_fail(self): + """ + Test that AccessList names must be unique per device. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(Device), + "assigned_object_id": self.device1.id, + **self.common_acl_params, + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() + + def test_duplicate_name_per_virtual_chassis_fail(self): + """ + Test that AccessList names must be unique per virtual chassis. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(VirtualChassis), + "assigned_object_id": self.virtual_chassis1.id, + **self.common_acl_params, + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() + + def test_duplicate_name_per_virtual_machine_fail(self): + """ + Test that AccessList names must be unique per virtual machine. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(VirtualMachine), + "assigned_object_id": self.virtual_machine1.id, + **self.common_acl_params, + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() + + def test_valid_acl_choices(self): + """ + Test that AccessList action choices using VALID choices. + """ + valid_acl_default_action_choices = ["permit", "deny"] + valid_acl_types = ["standard", "extended"] + if len(valid_acl_default_action_choices) > len(valid_acl_types): + valid_acl_choices = list(zip(valid_acl_default_action_choices, cycle(valid_acl_types))) + elif len(valid_acl_default_action_choices) < len(valid_acl_types): + valid_acl_choices = list(zip(cycle(valid_acl_default_action_choices), valid_acl_types)) + else: + valid_acl_choices = list(zip(valid_acl_default_action_choices, valid_acl_types)) + + for default_action, acl_type in valid_acl_choices: + valid_acl_choice = AccessList( + name=f"TestACL_Valid_Choice_{default_action}_{acl_type}", + assigned_object=self.device1, + type=acl_type, + default_action=default_action, + comments=f"VALID ACL CHOICES USED: {default_action=} {acl_type=}", + ) + valid_acl_choice.full_clean() + + def test_invalid_acl_choices(self): + """ + Test that AccessList action choices using INVALID choices. + """ + valid_acl_types = ["standard", "extended"] + invalid_acl_default_action_choice = "log" + invalid_acl_default_action = AccessList( + name=f"TestACL_Valid_Choice_{invalid_acl_default_action_choice}_{valid_acl_types[0]}", + assigned_object=self.device1, + type=valid_acl_types[0], + default_action=invalid_acl_default_action_choice, + comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_default_action_choice}'", + ) + with self.assertRaises(ValidationError): + invalid_acl_default_action.full_clean() + + valid_acl_default_action_choices = ["permit", "deny"] + invalid_acl_type = "super-dupper-extended" + invalid_acl_type = AccessList( + name=f"TestACL_Valid_Choice_{valid_acl_default_action_choices[0]}_{invalid_acl_type}", + assigned_object=self.device1, + type=invalid_acl_type, + default_action=valid_acl_default_action_choices[0], + comments=f"INVALID ACL DEFAULT CHOICE USED: type='{invalid_acl_type}'", + ) + with self.assertRaises(ValidationError): + invalid_acl_type.full_clean() diff --git a/netbox_acls/tests/models/test_aclinterfaceassignments.py b/netbox_acls/tests/models/test_aclinterfaceassignments.py new file mode 100644 index 00000000..bd2cb0f9 --- /dev/null +++ b/netbox_acls/tests/models/test_aclinterfaceassignments.py @@ -0,0 +1,190 @@ +from dcim.models import Interface +from django.core.exceptions import ValidationError +from virtualization.models import VMInterface + +from netbox_acls.models import AccessList, ACLInterfaceAssignment + +from .base import BaseTestCase + + +class TestACLInterfaceAssignment(BaseTestCase): + """ + Test ACLInterfaceAssignment model. + """ + + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + + interface_type = "1000baset" + + # Device Interfaces + cls.device_interface1 = Interface.objects.create( + name="Interface 1", + device=cls.device1, + type=interface_type, + ) + cls.device_interface2 = Interface.objects.create( + name="Interface 2", + device=cls.device1, + type=interface_type, + ) + + # Virtual Machine Interfaces + cls.vm_interface1 = VMInterface.objects.create( + name="Interface 1", + virtual_machine=cls.virtual_machine1, + ) + cls.vm_interface2 = VMInterface.objects.create( + name="Interface 2", + virtual_machine=cls.virtual_machine1, + ) + + def test_acl_interface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host + and not already assigned to the interface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + device_acl.save() + acl_device_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=self.device_interface1, + ) + acl_device_interface.full_clean() + + def test_acl_interface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the ACL is not + assigned to the parent host. + """ + device_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + device_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=self.vm_interface1, + ) + with self.assertRaises(ValidationError): + acl_vm_interface.full_clean() + acl_vm_interface.save() + + def test_acl_vminterface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host + and not already assigned to the vminterface and direction. + """ + vm_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.virtual_machine1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + vm_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=vm_acl, + direction="ingress", + assigned_object=self.vm_interface1, + ) + acl_vm_interface.full_clean() + + def test_duplicate_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation + if the ACL already is assigned to the same interface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + device_acl.save() + acl_device_interface1 = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=self.device_interface1, + ) + acl_device_interface1.full_clean() + acl_device_interface1.save() + acl_device_interface2 = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=self.device_interface1, + ) + with self.assertRaises(ValidationError): + acl_device_interface2.full_clean() + + def test_acl_already_assigned_fail(self): + """ + Test that ACLInterfaceAssignment fails validation + if the interface already has an ACL assigned in the same direction. + """ + pass + # TODO: test_acl_already_assigned_fail - VM & Device + + def test_valid_acl_interface_assignment_choices(self): + """ + Test that ACLInterfaceAssignment action choices using VALID choices. + """ + valid_acl_assignment_direction_choices = ["ingress", "egress"] + + test_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + test_acl.save() + + for direction_choice in valid_acl_assignment_direction_choices: + valid_acl_assignment = ACLInterfaceAssignment( + access_list=test_acl, + direction=direction_choice, + assigned_object=self.device_interface1, + comments=f"VALID ACL ASSIGNMENT CHOICES USED: direction={direction_choice}", + ) + valid_acl_assignment.full_clean() + + def test_invalid_acl_choices(self): + """ + Test that ACLInterfaceAssignment action choices using INVALID choices. + """ + invalid_acl_assignment_direction_choice = "both" + + test_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + test_acl.save() + + invalid_acl_assignment_direction = ACLInterfaceAssignment( + access_list=test_acl, + direction=invalid_acl_assignment_direction_choice, + assigned_object=self.device_interface1, + comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_assignment_direction_choice}'", + ) + with self.assertRaises(ValidationError): + invalid_acl_assignment_direction.full_clean() diff --git a/netbox_acls/tests/models/test_extendedrules.py b/netbox_acls/tests/models/test_extendedrules.py new file mode 100644 index 00000000..b84813b1 --- /dev/null +++ b/netbox_acls/tests/models/test_extendedrules.py @@ -0,0 +1,535 @@ +from django.core.exceptions import ValidationError + +from netbox_acls.choices import ACLProtocolChoices, ACLTypeChoices +from netbox_acls.models import AccessList, ACLExtendedRule + +from .base import BaseTestCase + + +class TestACLExtendedRule(BaseTestCase): + """ + Test ACLExtendedRule model. + """ + + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + + cls.acl_type = ACLTypeChoices.TYPE_EXTENDED + cls.default_action = "deny" + cls.protocol = ACLProtocolChoices.PROTOCOL_TCP + + # AccessLists + cls.extended_acl1 = AccessList.objects.create( + name="EXTENDED_ACL", + assigned_object=cls.device1, + type=cls.acl_type, + default_action=cls.default_action, + comments="EXTENDED_ACL", + ) + cls.extended_acl2 = AccessList.objects.create( + name="EXTENDED_ACL", + assigned_object=cls.virtual_machine1, + type=cls.acl_type, + default_action=cls.default_action, + comments="EXTENDED_ACL", + ) + + def test_acl_extended_rule_creation_success(self): + """ + Test that ACLExtendedRule creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="permit", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description=( + "Created rule with any source prefix, any source port, " + "any destination prefix, any destination port, and any protocol." + ), + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 10) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual( + created_rule.description, + ( + "Created rule with any source prefix, any source port, " + "any destination prefix, any destination port, and any protocol." + ), + ) + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_source_prefix_creation_success(self): + """ + Test that ACLExtendedRule with source prefix creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=20, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Created rule with source prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 20) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual(created_rule.description, "Created rule with source prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_source_ports_creation_success(self): + """ + Test that ACLExtendedRule with source ports creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=30, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=[22, 443], + destination_prefix=None, + destination_ports=None, + protocol=self.protocol, + description="Created rule with source ports", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 30) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, [22, 443]) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, self.protocol) + self.assertEqual(created_rule.description, "Created rule with source ports") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_destination_prefix_creation_success(self): + """ + Test that ACLExtendedRule with destination prefix creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=40, + action="permit", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=None, + protocol=None, + description="Created rule with destination prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 40) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, self.prefix1) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual(created_rule.description, "Created rule with destination prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_destination_ports_creation_success(self): + """ + Test that ACLExtendedRule with destination ports creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=50, + action="permit", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=[22, 443], + protocol=self.protocol, + description="Created rule with destination ports", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 50) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, self.prefix1) + self.assertEqual(created_rule.destination_ports, [22, 443]) + self.assertEqual(created_rule.protocol, self.protocol) + self.assertEqual(created_rule.description, "Created rule with destination ports") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_icmp_protocol_creation_success(self): + """ + Test that ACLExtendedRule with ICMP protocol creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=60, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=None, + destination_prefix=self.prefix2, + destination_ports=None, + protocol=ACLProtocolChoices.PROTOCOL_ICMP, + description="Created rule with ICMP protocol", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 60) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, self.prefix2) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, ACLProtocolChoices.PROTOCOL_ICMP) + self.assertEqual(created_rule.description, "Created rule with ICMP protocol") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_complete_params_creation_success(self): + """ + Test that ACLExtendedRule with complete parameters creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=70, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=[4000, 5000], + destination_prefix=self.prefix2, + destination_ports=[22, 443], + protocol=self.protocol, + description="Created rule with complete parameters", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 70) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, [4000, 5000]) + self.assertEqual(created_rule.destination_prefix, self.prefix2) + self.assertEqual(created_rule.destination_ports, [22, 443]) + self.assertEqual(created_rule.protocol, self.protocol) + self.assertEqual(created_rule.description, "Created rule with complete parameters") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_remark_creation_success(self): + """ + Test that ACLExtendedRule with remark creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=80, + action="remark", + remark="Test remark", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Created rule with remark", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 80) + self.assertEqual(created_rule.action, "remark") + self.assertEqual(created_rule.remark, "Test remark") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual(created_rule.description, "Created rule with remark") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_access_list_standard_to_acl_extended_rule_assignment_fail(self): + """ + Test that Standard Access List cannot be assigned to ACLExtendedRule. + """ + standard_acl1 = AccessList.objects.create( + name="STANDARD_ACL", + assigned_object=self.device1, + type=ACLTypeChoices.TYPE_STANDARD, + default_action=self.default_action, + comments="STANDARD_ACL", + ) + extended_rule = ACLExtendedRule( + access_list=standard_acl1, + index=80, + action="remark", + remark="Test remark", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Created rule with remark", + ) + with self.assertRaises(ValidationError): + extended_rule.full_clean() + + def test_duplicate_index_per_acl_fail(self): + """ + Test that the rule index must be unique per AccessList. + """ + params = { + "access_list": self.extended_acl1, + "index": 10, + "action": "permit", + } + rule_1 = ACLExtendedRule(**params) + rule_1.full_clean() + rule_1.save() + rule_2 = ACLExtendedRule(**params) + with self.assertRaises(ValidationError): + rule_2.full_clean() + + def test_acl_extended_rule_action_permit_with_remark_fail(self): + """ + Test that ACLExtendedRule with action 'permit' and remark fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="permit", + remark="Remark", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'permit' and remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_no_remark_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and without remark fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'remark' and without remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_source_prefix_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and source prefix fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=self.prefix1, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'remark' and source prefix", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_source_ports_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and source ports fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=self.prefix1, + source_ports=[80, 443], + destination_prefix=None, + destination_ports=None, + protocol=ACLProtocolChoices.PROTOCOL_TCP, + description="Invalid rule with action 'remark' and source ports", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_destination_prefix_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and destination prefix fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'remark' and destination prefix", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_destination_ports_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and destination ports fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=[80, 443], + protocol=ACLProtocolChoices.PROTOCOL_TCP, + description="Invalid rule with action 'remark' and destination ports", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_protocol_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and protocol fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=ACLProtocolChoices.PROTOCOL_ICMP, + description="Invalid rule with action 'remark' and ICMP protocol", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_valid_acl_rule_action_choices(self): + """ + Test ACLExtendedRule action choices using VALID choices. + """ + valid_acl_rule_action_choices = ["deny", "permit", "remark"] + + for action_choice in valid_acl_rule_action_choices: + valid_acl_rule_action = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action=action_choice, + remark="Remark" if action_choice == "remark" else None, + description=f"VALID ACL RULE ACTION CHOICES USED: action={action_choice}", + ) + valid_acl_rule_action.full_clean() + + def test_invalid_acl_rule_action_choices(self): + """ + Test ACLExtendedRule action choices using INVALID choices. + """ + invalid_acl_rule_action_choice = "both" + + invalid_acl_rule_action = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action=invalid_acl_rule_action_choice, + description=f"INVALID ACL RULE ACTION CHOICES USED: action={invalid_acl_rule_action_choice}", + ) + + with self.assertRaises(ValidationError): + invalid_acl_rule_action.full_clean() + + def test_valid_acl_rule_protocol_choices(self): + """ + Test ACLExtendedRule protocol choices using VALID choices. + """ + valid_acl_rule_protocol_choices = ["icmp", "tcp", "udp"] + + for protocol_choice in valid_acl_rule_protocol_choices: + valid_acl_rule_protocol = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action=self.default_action, + protocol=protocol_choice, + description=f"VALID ACL RULE PROTOCOL CHOICES USED: protocol={protocol_choice}", + ) + valid_acl_rule_protocol.full_clean() + + def test_invalid_acl_rule_protocol_choices(self): + """ + Test ACLExtendedRule protocol choices using INVALID choices. + """ + invalid_acl_rule_protocol_choice = "ethernet" + + invalid_acl_rule_protocol = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + protocol=invalid_acl_rule_protocol_choice, + description=f"INVALID ACL RULE PROTOCOL CHOICES USED: protocol={invalid_acl_rule_protocol_choice}", + ) + + with self.assertRaises(ValidationError): + invalid_acl_rule_protocol.full_clean() diff --git a/netbox_acls/tests/models/test_standardrules.py b/netbox_acls/tests/models/test_standardrules.py new file mode 100644 index 00000000..dcad4968 --- /dev/null +++ b/netbox_acls/tests/models/test_standardrules.py @@ -0,0 +1,222 @@ +from django.core.exceptions import ValidationError + +from netbox_acls.choices import ACLTypeChoices +from netbox_acls.models import AccessList, ACLStandardRule + +from .base import BaseTestCase + + +class TestACLStandardRule(BaseTestCase): + """ + Test ACLStandardRule model. + """ + + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + + cls.acl_type = ACLTypeChoices.TYPE_STANDARD + cls.default_action = "deny" + + # AccessLists + cls.standard_acl1 = AccessList.objects.create( + name="STANDARD_ACL", + assigned_object=cls.device1, + type=cls.acl_type, + default_action=cls.default_action, + comments="STANDARD_ACL", + ) + cls.standard_acl2 = AccessList.objects.create( + name="STANDARD_ACL", + assigned_object=cls.virtual_machine1, + type=cls.acl_type, + default_action=cls.default_action, + comments="STANDARD_ACL", + ) + + def test_acl_standard_rule_creation_success(self): + """ + Test that ACLStandardRule creation passes validation. + """ + created_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="permit", + remark="", + source_prefix=None, + description="Created rule with any source prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLStandardRule), True) + self.assertEqual(created_rule.index, 10) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.description, "Created rule with any source prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_standard_rule_source_prefix_creation_success(self): + """ + Test that ACLStandardRule with source prefix creation passes validation. + """ + created_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=20, + action="permit", + remark="", + source_prefix=self.prefix1, + description="Created rule with source prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLStandardRule), True) + self.assertEqual(created_rule.index, 20) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.description, "Created rule with source prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_standard_rule_remark_creation_success(self): + """ + Test that ACLStandardRule with remark creation passes validation. + """ + created_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=30, + action="remark", + remark="Test remark", + source_prefix=None, + description="Created rule with remark", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLStandardRule), True) + self.assertEqual(created_rule.index, 30) + self.assertEqual(created_rule.action, "remark") + self.assertEqual(created_rule.remark, "Test remark") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.description, "Created rule with remark") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_access_list_extended_to_acl_standard_rule_assignment_fail(self): + """ + Test that Extended Access List cannot be assigned to ACLStandardRule. + """ + extended_acl1 = AccessList.objects.create( + name="EXTENDED_ACL", + assigned_object=self.device1, + type=ACLTypeChoices.TYPE_EXTENDED, + default_action=self.default_action, + comments="EXTENDED_ACL", + ) + standard_rule = ACLStandardRule( + access_list=extended_acl1, + index=30, + action="remark", + remark="Test remark", + source_prefix=None, + description="Created rule with remark", + ) + with self.assertRaises(ValidationError): + standard_rule.full_clean() + + def test_duplicate_index_per_acl_fail(self): + """ + Test that the rule index must be unique per AccessList. + """ + params = { + "access_list": self.standard_acl1, + "index": 10, + "action": "permit", + } + rule_1 = ACLStandardRule(**params) + rule_1.full_clean() + rule_1.save() + rule_2 = ACLStandardRule(**params) + with self.assertRaises(ValidationError): + rule_2.full_clean() + + def test_acl_standard_rule_action_permit_with_remark_fail(self): + """ + Test that ACLStandardRule with action 'permit' and remark fails validation. + """ + invalid_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="permit", + remark="Remark", + source_prefix=None, + description="Invalid rule with action 'permit' and remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_standard_rule_action_remark_with_no_remark_fail(self): + """ + Test that ACLStandardRule with action 'remark' and without remark fails validation. + """ + invalid_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + description="Invalid rule with action 'remark' and without remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_standard_rule_action_remark_with_source_prefix_fail(self): + """ + Test that ACLStandardRule with action 'remark' and source prefix fails validation. + """ + invalid_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="remark", + remark="", + source_prefix=self.prefix1, + description="Invalid rule with action 'remark' and source prefix", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_valid_acl_rule_action_choices(self): + """ + Test ACLStandardRule action choices using VALID choices. + """ + valid_acl_rule_action_choices = ["deny", "permit", "remark"] + + for action_choice in valid_acl_rule_action_choices: + valid_acl_rule_action = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action=action_choice, + remark="Remark" if action_choice == "remark" else None, + description=f"VALID ACL RULE ACTION CHOICES USED: action={action_choice}", + ) + valid_acl_rule_action.full_clean() + + def test_invalid_acl_rule_action_choices(self): + """ + Test ACLStandardRule action choices using INVALID choices. + """ + invalid_acl_rule_action_choice = "both" + + invalid_acl_rule_action = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action=invalid_acl_rule_action_choice, + description=f"INVALID ACL RULE ACTION CHOICES USED: action={invalid_acl_rule_action_choice}", + ) + + with self.assertRaises(ValidationError): + invalid_acl_rule_action.full_clean()