Skip to content

Commit

Permalink
Merge pull request #110 from tonioo/feature/unique_filtername_in_filt…
Browse files Browse the repository at this point in the history
…erset

Ensure name uniqueness in filter set
  • Loading branch information
tonioo authored Apr 2, 2024
2 parents 5c6e483 + 77c1ba6 commit ce06706
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 39 deletions.
109 changes: 71 additions & 38 deletions sievelib/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
from sievelib import commands


class FiltersSet(object):
class FilterAlreadyExists(Exception):
pass


class FiltersSet:
"""A set of filters."""

def __init__(
self,
name,
filter_name_pretext="# Filter: ",
filter_desc_pretext="# Description: ",
name: str,
filter_name_pretext: str = "# Filter: ",
filter_desc_pretext: str = "# Description: ",
):
"""Represents a set of one or more filters
"""
Represents a set of one or more filters.
:param name: the filterset's name
:param filter_name_pretext: the text that is used to mark a filter name
Expand Down Expand Up @@ -86,7 +91,7 @@ def from_parser_result(self, parser):
]
cpt += 1

def require(self, name):
def require(self, name: str):
"""Add a new extension to the requirements list
:param name: the extension's name
Expand Down Expand Up @@ -271,24 +276,38 @@ def _unicode_filter_name(self, name):
"""Convert name to unicode if necessary."""
return name.decode("utf-8") if isinstance(name, bytes) else name

def addfilter(self, name, conditions, actions, matchtype="anyof"):
def filter_exists(self, name: str) -> bool:
"""Check if a filter with name already exists."""
for existing_filter in self.filters:
if existing_filter["name"] == name:
return True
return False

def addfilter(
self, name: str, conditions: list, actions: list, matchtype: str = "anyof"
):
"""Add a new filter to this filters set
:param name: the filter's name
:param conditions: the list of conditions
:param actions: the list of actions
:param matchtype: "anyof" or "allof"
"""
name = self._unicode_filter_name(name)
if self.filter_exists(name):
raise FilterAlreadyExists
ifcontrol = self.__create_filter(conditions, actions, matchtype)
self.filters += [
{
"name": self._unicode_filter_name(name),
"name": name,
"content": ifcontrol,
"enabled": True,
}
]

def updatefilter(self, oldname, newname, conditions, actions, matchtype="anyof"):
def updatefilter(
self, oldname: str, newname: str, conditions, actions, matchtype: str = "anyof"
) -> bool:
"""Update a specific filter
Instead of removing and re-creating the filter, we update the
Expand All @@ -300,18 +319,26 @@ def updatefilter(self, oldname, newname, conditions, actions, matchtype="anyof")
:param actions: the list of actions
:param matchtype: "anyof" or "allof"
"""
filter_def = None
oldname = self._unicode_filter_name(oldname)
newname = self._unicode_filter_name(newname)
for f in self.filters:
if f["name"] == oldname:
f["name"] = newname
f["content"] = self.__create_filter(conditions, actions, matchtype)
if not f["enabled"]:
return self.disablefilter(newname)
return True
return False
filter_def = f
break
if not filter_def:
return False
newname = self._unicode_filter_name(newname)
if newname != oldname and self.filter_exists(newname):
raise FilterAlreadyExists
filter_def["name"] = newname
filter_def["content"] = self.__create_filter(conditions, actions, matchtype)
if not filter_def["enabled"]:
return self.disablefilter(newname)
return True

def replacefilter(self, oldname, sieve_filter, newname=None, description=None):
def replacefilter(
self, oldname: str, sieve_filter, newname: str = None, description: str = None
):
"""replace a specific sieve_filter
Instead of removing and re-creating the sieve_filter, we update the
Expand All @@ -322,22 +349,28 @@ def replacefilter(self, oldname, sieve_filter, newname=None, description=None):
:param sieve_filter: the sieve_filter object as get from
FiltersSet.getfilter()
"""
filter_def = None
oldname = self._unicode_filter_name(oldname)
newname = self._unicode_filter_name(newname)
if newname is None:
newname = oldname
for f in self.filters:
if f["name"] == oldname:
f["name"] = newname
f["content"] = sieve_filter
if description is not None:
f["description"] = description
if not f["enabled"]:
return self.disablefilter(newname)
return True
return False
filter_def = f
break
if not filter_def:
return False
if newname is None:
newname = oldname
newname = self._unicode_filter_name(newname)
if newname != oldname and self.filter_exists(newname):
raise FilterAlreadyExists
filter_def["name"] = newname
filter_def["content"] = sieve_filter
if description is not None:
filter_def["description"] = description
if not filter_def["enabled"]:
return self.disablefilter(newname)
return True

def getfilter(self, name):
def getfilter(self, name: str):
"""Search for a specific filter
:param name: the filter's name
Expand All @@ -351,7 +384,7 @@ def getfilter(self, name):
return f["content"]
return None

def get_filter_matchtype(self, name):
def get_filter_matchtype(self, name: str) -> str:
"""Retrieve matchtype of the given filter."""
flt = self.getfilter(name)
if not flt:
Expand All @@ -361,7 +394,7 @@ def get_filter_matchtype(self, name):
return node.__class__.__name__.lower().replace("command", "")
return None

def get_filter_conditions(self, name):
def get_filter_conditions(self, name: str) -> list:
"""Retrieve conditions of the given filter."""
flt = self.getfilter(name)
if not flt:
Expand Down Expand Up @@ -401,18 +434,18 @@ def get_filter_conditions(self, name):
conditions.append(args)
return conditions

def get_filter_actions(self, name):
def get_filter_actions(self, name: str) -> list:
"""Retrieve actions of the given filter."""
flt = self.getfilter(name)
if not flt:
return None
actions = []
actions: list = []
for node in flt.walk():
if isinstance(node, commands.ActionCommand):
actions.append(node.args_as_tuple())
return actions

def removefilter(self, name):
def removefilter(self, name: str) -> bool:
"""Remove a specific filter
:param name: the filter's name
Expand All @@ -424,7 +457,7 @@ def removefilter(self, name):
return True
return False

def enablefilter(self, name):
def enablefilter(self, name: str) -> bool:
"""Enable a filter
Just removes the "if false" test surrouding this filter.
Expand All @@ -442,7 +475,7 @@ def enablefilter(self, name):
return True
return False # raise NotFound

def is_filter_disabled(self, name):
def is_filter_disabled(self, name: str) -> bool:
"""Tells if the filter is currently disabled or not
:param name: the filter's name
Expand All @@ -453,7 +486,7 @@ def is_filter_disabled(self, name):
return self.__isdisabled(f["content"])
return True

def disablefilter(self, name):
def disablefilter(self, name: str) -> bool:
"""Disable a filter
Instead of commenting the filter, we just surround it with a
Expand All @@ -475,7 +508,7 @@ def disablefilter(self, name):
return True
return False

def movefilter(self, name, direction):
def movefilter(self, name: str, direction: str) -> bool:
"""Moves the filter up or down
:param name: the filter's name
Expand Down
115 changes: 114 additions & 1 deletion sievelib/tests/test_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import io

from sievelib.factory import FiltersSet
from sievelib.factory import FilterAlreadyExists, FiltersSet
from .. import parser


Expand All @@ -10,6 +10,119 @@ class FactoryTestCase(unittest.TestCase):
def setUp(self):
self.fs = FiltersSet("test")

def test_add_duplicate_filter(self):
"""Try to add the same filter name twice, should fail."""
self.fs.addfilter(
"ruleX",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Toto"),
],
)
with self.assertRaises(FilterAlreadyExists):
self.fs.addfilter(
"ruleX",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Toto"),
],
)

def test_updatefilter(self):
self.fs.addfilter(
"ruleX",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Toto"),
],
)
result = self.fs.updatefilter(
"ruleY",
"ruleX",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Tata"),
],
)
self.assertFalse(result)
result = self.fs.updatefilter(
"ruleX",
"ruleY",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Tata"),
],
)
self.assertTrue(result)
self.assertIs(self.fs.getfilter("ruleX"), None)
self.assertIsNot(self.fs.getfilter("ruleY"), None)

def test_updatefilter_duplicate(self):
self.fs.addfilter(
"ruleX",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Toto"),
],
)
self.fs.addfilter(
"ruleY",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Tota"),
],
)
with self.assertRaises(FilterAlreadyExists):
self.fs.updatefilter(
"ruleX",
"ruleY",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Toti"),
],
)

def test_replacefilter(self):
self.fs.addfilter(
"ruleX",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Toto"),
],
)
self.fs.addfilter(
"ruleY",
[
("Sender", ":is", "[email protected]"),
],
[
("fileinto", ":copy", "Tota"),
],
)
content = self.fs.getfilter("ruleX")
result = self.fs.replacefilter("ruleZ", content)
self.assertFalse(result)
result = self.fs.replacefilter("ruleY", content)
self.assertTrue(result)

def test_get_filter_conditions(self):
"""Test get_filter_conditions method."""
orig_conditions = [("Sender", ":is", "[email protected]")]
Expand Down

0 comments on commit ce06706

Please sign in to comment.