Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config tests #678

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a6676e9
Rewrite of MainConfig.py to use singleton
hickey Nov 7, 2022
71e7420
Added PyTest modules to requirements.txt
hickey Nov 7, 2022
7caeaa7
Initial set of tests for MainConfig
hickey Nov 7, 2022
43d0dbe
Handle readonly values better in MainConfig
hickey Nov 8, 2022
c27bb73
Removed FTS_FED_PASSWORD as not documented
hickey Nov 8, 2022
ffef445
Added processing of YAML config
hickey Nov 8, 2022
d71dbdb
Added testing of YAML config
hickey Nov 8, 2022
cb5d54b
Fixed MainConfig to handle bool env vars
hickey Nov 8, 2022
2c31620
Fixed default paths based on common variable
hickey Nov 9, 2022
11f30e4
Added test of YAML config with env var overrides
hickey Nov 9, 2022
1c8cc11
Added tests to validate config as attributes
hickey Nov 9, 2022
95cc19b
Mass update of code accessing MainConfig
hickey Nov 10, 2022
da375f9
Added AllowCLIIPs to new config architecture
hickey Nov 16, 2022
f1a9565
Added tests for dealing with list vars (i.e. AllowCLIIPs)
hickey Nov 16, 2022
15ed052
Moved constants to top of MainConfig.py for readability
hickey Nov 16, 2022
3d1aeaa
Moved nodeID into new config structure
hickey Nov 16, 2022
7d74c67
Added reset() to reinit config for testing
hickey Nov 16, 2022
f71429e
Added version and CLIIP to config structure
hickey Nov 16, 2022
f94a5d2
Refactored ip var to private
hickey Nov 16, 2022
01b5104
Fixed calls for config information
hickey Nov 16, 2022
f28c6e2
Added tests for config vars with dict access
hickey Nov 16, 2022
1f885e7
Updated comments to assist other devs
hickey Nov 16, 2022
34518fc
Clean up from a bad merge
hickey Nov 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions FreeTAKServer/controllers/ClientReceptionHandler.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#######################################################
#
#
# ClientReceptionHandler.py
# Python implementation of the Class ClientReceptionHandler
# Generated by Enterprise Architect
# Created on: 19-May-2020 7:17:21 PM
# Original author: Natha Paquette
#
#
#######################################################
import time
import socket
Expand All @@ -18,12 +18,15 @@
from FreeTAKServer.controllers.configuration.LoggingConstants import LoggingConstants
from defusedxml import ElementTree as etree

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

loggingConstants = LoggingConstants(log_name="FTS_ClientReceptionHandler")
logger = CreateLoggerController("FTS_ClientReceptionHandler", logging_constants=loggingConstants).getLogger()
from FreeTAKServer.controllers.configuration.ClientReceptionLoggingConstants import ClientReceptionLoggingConstants

loggingConstants = ClientReceptionLoggingConstants()
BUFF_SIZE = MainConfig.DataReceptionBuffer
BUFF_SIZE = config.DataReceptionBuffer

class ClientReceptionHandler:
def __init__(self):
Expand Down Expand Up @@ -64,7 +67,7 @@ def monitorForData(self, queue):
sock.settimeout(0.001)
try:
xmlstring = self.recv_until(sock).decode()
if xmlstring == b'' or xmlstring is None:
if xmlstring == b'' or xmlstring is None:
self.returnReceivedData(client, b'', queue)
logger.debug("empty string sent, standard disconnect")
continue
Expand Down Expand Up @@ -113,7 +116,7 @@ def returnReceivedData(self, clientInformation, data, queue):
def recv_until(self, client):
start_receive_time = time.time()
message = client.recv(BUFF_SIZE)
while time.time() - start_receive_time <= MainConfig.MaxReceptionTime:
while time.time() - start_receive_time <= config.MaxReceptionTime:
try:
message = message + client.recv(BUFF_SIZE)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
from FreeTAKServer.controllers.XMLCoTController import XMLCoTController
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

class SendSensorDroneController:
def __init__(self, json):
tempObject = event.DroneSensor()
object = SendSensorDrone()
object.setModelObject(tempObject)
object.modelObject = self._serializeJsonToModel(object.modelObject, json)
if not MainConfig.OptimizeAPI:
if not config.OptimizeAPI:
DatabaseController().create_CoT(object.modelObject)
object.setXmlString(XMLCoTController().serialize_model_to_CoT(object.modelObject))
self.setCoTObject(object)
Expand Down
6 changes: 4 additions & 2 deletions FreeTAKServer/controllers/ServerStatusController.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
import typing

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

def modifyDefaultIP(func):
def changeDefaultIP(instance, port, ip):
import socket
Expand Down Expand Up @@ -101,7 +104,6 @@

def SSLDataPackageStatusCheck(self, SSLDataPackagePort, IP):
import requests
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
try:
import socket
if IP == "0.0.0.0":
Expand All @@ -110,7 +112,7 @@
IP = s.getsockname()[0]
else:
pass
conn = requests.get(f'https://{IP}:{SSLDataPackagePort}/Alive', cert=(MainConfig.pemDir, MainConfig.unencryptedKey), verify=False)
conn = requests.get(f'https://{IP}:{SSLDataPackagePort}/Alive', cert=(config.pemDir, config.unencryptedKey), verify=False)

Check failure

Code scanning / SonarCloud

Server certificates should be verified during SSL/TLS connections High

Enable server certificate validation on this SSL/TLS connection. See more on SonarCloud
if conn.status_code == 200:
return "on"
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
from abc import ABC
from FreeTAKServer.controllers.serializers import xml_serializer

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

loggingConstants = LoggingConstants(log_name="FTS_SendCoTAbstract")
logger = CreateLoggerController("FTS_SendCoTAbstract", logging_constants=loggingConstants).getLogger()


class SendCoTAbstractController(ABC):
Event = event

def fill_object(self, object, tempObject, RawCoT, addToDB=MainConfig.SaveCoTToDB):
def fill_object(self, object, tempObject, RawCoT, addToDB=config.SaveCoTToDB):
try:
object.modelObject = self.create_model_object(tempObject, RawCoT.xmlString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from FreeTAKServer.controllers.CreateLoggerController import CreateLoggerController
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

loggingConstants = LoggingConstants()
logger = CreateLoggerController("SendOtherController").getLogger()


class SendOtherController(SendCoTAbstractController):
def __init__(self, RawCoT=None, addToDB=MainConfig.SaveCoTToDB):
def __init__(self, RawCoT=None, addToDB=config.SaveCoTToDB):
if type(RawCoT != bytes):
pass
else:
Expand Down
61 changes: 32 additions & 29 deletions FreeTAKServer/controllers/certificate_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
subprocess.run(["pip3", "install", "requests"], capture_output=True)
import hashlib

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

def _utc_time_from_datetime(date):
fmt = '%y%m%d%H%M'
Expand All @@ -36,7 +38,7 @@ def _utc_time_from_datetime(date):
return date.strftime(fmt)


def revoke_certificate(username, revoked_file=None, ca_pem = MainConfig.CA, ca_key = MainConfig.CAkey, crl_file = MainConfig.CRLFile, user_cert_dir=MainConfig.certsPath, crl_path=MainConfig.CRLFile):
def revoke_certificate(username, revoked_file=None, ca_pem = config.CA, ca_key = config.CAkey, crl_file = config.CRLFile, user_cert_dir=config.certsPath, crl_path=config.CRLFile):
"""
Function to create/update a CRL with revoked user certificates
:param ca_pem: The path to your CA PEM file
Expand All @@ -53,6 +55,7 @@ def revoke_certificate(username, revoked_file=None, ca_pem = MainConfig.CA, ca_k
import json
from OpenSSL import crypto
from datetime import datetime

data = {}
certificate = crypto.load_certificate(crypto.FILETYPE_PEM, open(ca_pem, mode="rb").read())
private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, open(ca_key, mode="r").read())
Expand All @@ -66,7 +69,7 @@ def revoke_certificate(username, revoked_file=None, ca_pem = MainConfig.CA, ca_k

for cert in os.listdir(user_cert_dir):
if cert.lower() == f"{username.lower()}.pem":
with open(MainConfig.certsPath+'/'+cert, 'rb') as cert:
with open(config.certsPath+'/'+cert, 'rb') as cert:
revoked_cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert.read())
data[str(revoked_cert.get_serial_number())] = username
break
Expand Down Expand Up @@ -131,7 +134,7 @@ def send_data_package(server: str, dp_name: str = "user.zip") -> bool:
return False

def generate_standard_zip(server_address: str = None, server_filename: str = "server.p12", user_filename: str = "Client.p12",
cert_password: str = MainConfig.password, ssl_port: str = "8089") -> None:
cert_password: str = config.password, ssl_port: str = "8089") -> None:
"""
A Function to generate a Client connection Data Package (DP) from a server and user p12 file in the current
working directory.
Expand Down Expand Up @@ -168,31 +171,31 @@ def generate_standard_zip(server_address: str = None, server_filename: str = "se
<Contents>
<Content ignore="false" zipEntry="cert/fts.pref"/>
<Content ignore="false" zipEntry="cert/{{ server_filename }}"/>
<Content ignore="false" zipEntry="cert/{{ user_filename }}"/>
<Content ignore="false" zipEntry="cert/{{ user_filename }}"/>
</Contents>
</MissionPackageManifest>
""")

username = user_filename[:-4]
random_id = uuid.uuid4()
parentfolder = "80b828699e074a239066d454a76284eb"
if MainConfig.UserConnectionIP == "0.0.0.0":
if config.UserConnectionIP == "0.0.0.0":
hostname = socket.gethostname()
server_address = socket.gethostbyname(hostname)
else:
server_address = MainConfig.UserConnectionIP
server_address = config.UserConnectionIP
pref = pref_file_template.render(server=server_address, server_filename=server_filename,
user_filename=user_filename, cert_password=cert_password,
port=str(MainConfig.SSLCoTServicePort))
port=str(config.SSLCoTServicePort))
man = manifest_file_template.render(uid=random_id, server=server_address, server_filename=server_filename,
user_filename=user_filename)
with open('fts.pref', 'w') as pref_file:
pref_file.write(pref)
with open('manifest.xml', 'w') as manifest_file:
manifest_file.write(man)
copyfile(MainConfig.p12Dir, server_filename)
copyfile(pathlib.Path(MainConfig.certsPath, user_filename), pathlib.Path(user_filename))
zipf = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(MainConfig.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
copyfile(config.p12Dir, server_filename)
copyfile(pathlib.Path(config.certsPath, user_filename), pathlib.Path(user_filename))
zipf = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(config.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
zipf.write('fts.pref')
zipf.write('manifest.xml')
zipf.write(user_filename)
Expand All @@ -202,7 +205,7 @@ def generate_standard_zip(server_address: str = None, server_filename: str = "se
os.remove('manifest.xml')

def generate_wintak_zip(server_address: str = None, server_filename: str = "server.p12", user_filename: str = "Client.p12",
cert_password: str = MainConfig.password, ssl_port: str = "8089") -> None:
cert_password: str = config.password, ssl_port: str = "8089") -> None:
"""
A Function to generate a Client connection Data Package (DP) from a server and user p12 file in the current
working directory.
Expand Down Expand Up @@ -239,7 +242,7 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
<Contents>
<Content ignore="false" zipEntry="{{ folder }}/fts.pref"/>
<Content ignore="false" zipEntry="{{ folder }}/{{ server_filename }}"/>
<Content ignore="false" zipEntry="{{ folder }}/{{ user_filename }}"/>
<Content ignore="false" zipEntry="{{ folder }}/{{ user_filename }}"/>
</Contents>
</MissionPackageManifest>
""")
Expand All @@ -259,14 +262,14 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
new_uid = uuid.uuid4()
folder = "5c2bfcae3d98c9f4d262172df99ebac5"
parentfolder = "80b828699e074a239066d454a76284eb"
if MainConfig.UserConnectionIP == "0.0.0.0":
if config.UserConnectionIP == "0.0.0.0":
hostname = socket.gethostname()
server_address = socket.gethostbyname(hostname)
else:
server_address = MainConfig.UserConnectionIP
server_address = config.UserConnectionIP
pref = pref_file_template.render(server=server_address, server_filename=server_filename,
user_filename=user_filename, cert_password=cert_password,
port=str(MainConfig.SSLCoTServicePort))
port=str(config.SSLCoTServicePort))
man = manifest_file_template.render(uid=random_id, server=server_address, server_filename=server_filename,
user_filename=user_filename, folder=folder)
man_parent = manifest_file_parent_template.render(uid=new_uid, server=server_address,
Expand All @@ -280,8 +283,8 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
pref_file.write(pref)
with open('./MANIFEST/manifest.xml', 'w') as manifest_file:
manifest_file.write(man)
copyfile(MainConfig.p12Dir, "./" + folder + "/" + server_filename)
copyfile(pathlib.Path(MainConfig.certsPath, user_filename), pathlib.Path(folder, user_filename))
copyfile(config.p12Dir, "./" + folder + "/" + server_filename)
copyfile(pathlib.Path(config.certsPath, user_filename), pathlib.Path(folder, user_filename))
zipf = zipfile.ZipFile(f"{username}.zip", 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk('./' + folder):
for file in files:
Expand All @@ -300,7 +303,7 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
with open('./MANIFEST/manifest.xml', 'w') as manifest_parent:
manifest_parent.write(man_parent)
copyfile(f"{username}.zip", pathlib.Path(parentfolder, f"{username}.zip"))
zipp = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(MainConfig.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
zipp = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(config.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk('./' + parentfolder):
for file in files:
name = str(pathlib.PurePath(pathlib.Path(root), pathlib.Path(file)))
Expand All @@ -315,14 +318,14 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv


class AtakOfTheCerts:
def __init__(self, pwd: str = MainConfig.password) -> None:
def __init__(self, pwd: str = config.password) -> None:
"""
:param pwd: String based password used to secure the p12 files generated, defaults to MainConfig.password
"""
self.key = crypto.PKey()
self.CERTPWD = pwd
self.cakeypath = MainConfig.CAkey
self.capempath = MainConfig.CA
self.cakeypath = config.CAkey
self.capempath = config.CA

def __enter__(self):
return self
Expand Down Expand Up @@ -366,7 +369,7 @@ def generate_ca(self, expiry_time_secs: int = 31536000) -> None:
crl = crypto.CRL()
crl.sign(cert, ca_key, b"sha256")

with open(MainConfig.CRLFile, 'wb') as f:
with open(config.CRLFile, 'wb') as f:
f.write(crl.export(cert=cert, key=ca_key, digest=b"sha256"))

delete = 0
Expand Down Expand Up @@ -401,7 +404,7 @@ def _generate_key(self, keypath: str) -> None:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key))
f.close()

def _generate_certificate(self, common_name: str, p12path: str, pempath: str = MainConfig.pemDir,
def _generate_certificate(self, common_name: str, p12path: str, pempath: str = config.pemDir,
expiry_time_secs: int = 31536000) -> None:
"""
Create a certificate and p12 file
Expand Down Expand Up @@ -447,9 +450,9 @@ def bake(self, common_name: str, cert: str = "user", expiry_time_secs: int = 315
:param cert: Type of cert being created "user" or "server"
:param expiry_time_secs: length of time in seconds that the certificate is valid for, defaults to 1 year
"""
keypath = pathlib.Path(MainConfig.certsPath,f"{common_name}.key")
pempath = pathlib.Path(MainConfig.certsPath,f"{common_name}.pem")
p12path = pathlib.Path(MainConfig.certsPath,f"{common_name}.p12")
keypath = pathlib.Path(config.certsPath,f"{common_name}.key")
pempath = pathlib.Path(config.certsPath,f"{common_name}.pem")
p12path = pathlib.Path(config.certsPath,f"{common_name}.p12")
self._generate_key(keypath)
self._generate_certificate(common_name=common_name, pempath=pempath, p12path=p12path, expiry_time_secs=expiry_time_secs)
if cert.lower() == "server":
Expand All @@ -472,9 +475,9 @@ def copy_server_certs(server_name: str = "server") -> None:
return None
if not os.path.exists(dest + "/Certs"):
os.makedirs(dest + "/Certs")"""
copyfile("./" + server_name + ".key", MainConfig.keyDir)
copyfile("./" + server_name + ".key", MainConfig.unencryptedKey)
copyfile("./" + server_name + ".pem", MainConfig.pemDir)
copyfile("./" + server_name + ".key", config.keyDir)
copyfile("./" + server_name + ".key", config.unencryptedKey)
copyfile("./" + server_name + ".pem", config.pemDir)

def generate_auto_certs(self, ip: str, copy: bool = False, expiry_time_secs: int = 31536000, wintak_zip=False) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import pathlib
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

class DataPackageServerConstants:
def __init__(self):
# http server config
Expand All @@ -15,7 +18,7 @@ def __init__(self):
self.HTTPDEBUG = False
self.HTTPMETHODS = ['POST', 'GET', 'PUT']
self.IP = "0.0.0.0"
self.versionInfo = MainConfig.version
self.versionInfo = config.version
self.NodeID = 'FTS'
self.VERSIONJSON = '{"version":"2","type":"ServerConfig", "data":{"version": "%s", "api": "2","hostname":"%s"}, "nodeId":"%s"}' % (
self.versionInfo, "0.0.0.0", self.NodeID)
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

class DatabaseConfiguration:
DataBasePath = MainConfig.DBFilePath
if MainConfig.DataBaseType == "SQLite":
DataBasePath = config.DBFilePath
if config.DataBaseType == "SQLite":
DataBaseType = str('sqlite:///')
elif MainConfig.DataBaseType == "MySQL":
elif config.DataBaseType == "MySQL":
DataBaseType = str('mysql://')
DataBaseConnectionString = str(DataBaseType+DataBasePath)
Loading