-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore[Security]: restrict libs to allow specific functionalities (#1429)
* chore[Security]: restrict libs to allow specific functionalities * remove: extra lib handling * fix: ruff errors * fix: error message for bad import * fix: add io library in the blacklist
- Loading branch information
1 parent
437f949
commit 27cb449
Showing
12 changed files
with
731 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
class BaseRestrictedModule: | ||
def _wrap_function(self, func): | ||
def wrapper(*args, **kwargs): | ||
# Check for any suspicious arguments that might be used for importing | ||
for arg in args + tuple(kwargs.values()): | ||
if isinstance(arg, str) and any( | ||
module in arg.lower() | ||
for module in ["io", "os", "subprocess", "sys", "importlib"] | ||
): | ||
raise SecurityError( | ||
f"Potential security risk: '{arg}' is not allowed" | ||
) | ||
return func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
def _wrap_class(self, cls): | ||
class WrappedClass(cls): | ||
def __getattribute__(self, name): | ||
attr = super().__getattribute__(name) | ||
return self._wrap_function(self, attr) if callable(attr) else attr | ||
|
||
return WrappedClass | ||
|
||
|
||
class SecurityError(Exception): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import base64 | ||
|
||
from .base_restricted_module import BaseRestrictedModule | ||
|
||
|
||
class RestrictedBase64(BaseRestrictedModule): | ||
def __init__(self): | ||
self.allowed_functions = [ | ||
"b64encode", # Safe function to encode data into base64 | ||
"b64decode", # Safe function to decode base64 encoded data | ||
] | ||
|
||
# Bind the allowed functions to the object | ||
for func in self.allowed_functions: | ||
if hasattr(base64, func): | ||
setattr(self, func, self._wrap_function(getattr(base64, func))) | ||
|
||
def __getattr__(self, name): | ||
if name not in self.allowed_functions: | ||
raise AttributeError(f"'{name}' is not allowed in RestrictedBase64") | ||
return getattr(base64, name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import datetime | ||
|
||
from .base_restricted_module import BaseRestrictedModule | ||
|
||
|
||
class RestrictedDatetime(BaseRestrictedModule): | ||
def __init__(self): | ||
self.allowed_attributes = [ | ||
# Classes | ||
"date", | ||
"time", | ||
"datetime", | ||
"timedelta", | ||
"tzinfo", | ||
"timezone", | ||
# Constants | ||
"MINYEAR", | ||
"MAXYEAR", | ||
# Time zone constants | ||
"UTC", | ||
# Functions | ||
"now", | ||
"utcnow", | ||
"today", | ||
"fromtimestamp", | ||
"utcfromtimestamp", | ||
"fromordinal", | ||
"combine", | ||
"strptime", | ||
# Timedelta operations | ||
"timedelta", | ||
# Date operations | ||
"weekday", | ||
"isoweekday", | ||
"isocalendar", | ||
"isoformat", | ||
"ctime", | ||
"strftime", | ||
"year", | ||
"month", | ||
"day", | ||
"hour", | ||
"minute", | ||
"second", | ||
"microsecond", | ||
# Time operations | ||
"replace", | ||
"tzname", | ||
"dst", | ||
"utcoffset", | ||
# Comparison methods | ||
"min", | ||
"max", | ||
] | ||
|
||
for attr in self.allowed_attributes: | ||
if hasattr(datetime, attr): | ||
setattr(self, attr, self._wrap_function(getattr(datetime, attr))) | ||
|
||
def __getattr__(self, name): | ||
if name not in self.allowed_attributes: | ||
raise AttributeError(f"'{name}' is not allowed in RestrictedDatetime") | ||
|
||
return getattr(datetime, name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import json | ||
|
||
from .base_restricted_module import BaseRestrictedModule | ||
|
||
|
||
class RestrictedJson(BaseRestrictedModule): | ||
def __init__(self): | ||
self.allowed_functions = [ | ||
"load", | ||
"loads", | ||
"dump", | ||
"dumps", | ||
] | ||
|
||
# Bind the allowed functions to the object | ||
for func in self.allowed_functions: | ||
if hasattr(json, func): | ||
setattr(self, func, self._wrap_function(getattr(json, func))) | ||
|
||
def __getattr__(self, name): | ||
if name not in self.allowed_functions: | ||
raise AttributeError(f"'{name}' is not allowed in RestrictedJson") | ||
return getattr(json, name) |
Oops, something went wrong.