diff --git a/FreeTAKServer/controllers/ClientReceptionHandler.py b/FreeTAKServer/controllers/ClientReceptionHandler.py
index da21d8bb..aa59e3c9 100644
--- a/FreeTAKServer/controllers/ClientReceptionHandler.py
+++ b/FreeTAKServer/controllers/ClientReceptionHandler.py
@@ -1,11 +1,11 @@
#######################################################
-#
+#
# ClientReceptionHandler.py
# Python implementation of the Class ClientReceptionHandler
# Generated by Enterprise Architect
# Created on: 19-May-2020 7:17:21 PM
# Original author: Natha Paquette
-#
+#
#######################################################
import time
import socket
@@ -18,12 +18,15 @@
from FreeTAKServer.controllers.configuration.LoggingConstants import LoggingConstants
from defusedxml import ElementTree as etree
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loggingConstants = LoggingConstants(log_name="FTS_ClientReceptionHandler")
logger = CreateLoggerController("FTS_ClientReceptionHandler", logging_constants=loggingConstants).getLogger()
from FreeTAKServer.controllers.configuration.ClientReceptionLoggingConstants import ClientReceptionLoggingConstants
loggingConstants = ClientReceptionLoggingConstants()
-BUFF_SIZE = MainConfig.DataReceptionBuffer
+BUFF_SIZE = config.DataReceptionBuffer
class ClientReceptionHandler:
def __init__(self):
@@ -64,7 +67,7 @@ def monitorForData(self, queue):
sock.settimeout(0.001)
try:
xmlstring = self.recv_until(sock).decode()
- if xmlstring == b'' or xmlstring is None:
+ if xmlstring == b'' or xmlstring is None:
self.returnReceivedData(client, b'', queue)
logger.debug("empty string sent, standard disconnect")
continue
@@ -113,7 +116,7 @@ def returnReceivedData(self, clientInformation, data, queue):
def recv_until(self, client):
start_receive_time = time.time()
message = client.recv(BUFF_SIZE)
- while time.time() - start_receive_time <= MainConfig.MaxReceptionTime:
+ while time.time() - start_receive_time <= config.MaxReceptionTime:
try:
message = message + client.recv(BUFF_SIZE)
except Exception as e:
diff --git a/FreeTAKServer/controllers/RestMessageControllers/SendSensorDroneController.py b/FreeTAKServer/controllers/RestMessageControllers/SendSensorDroneController.py
index 7fe5be38..f39ea5ee 100644
--- a/FreeTAKServer/controllers/RestMessageControllers/SendSensorDroneController.py
+++ b/FreeTAKServer/controllers/RestMessageControllers/SendSensorDroneController.py
@@ -7,13 +7,16 @@
from FreeTAKServer.controllers.XMLCoTController import XMLCoTController
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class SendSensorDroneController:
def __init__(self, json):
tempObject = event.DroneSensor()
object = SendSensorDrone()
object.setModelObject(tempObject)
object.modelObject = self._serializeJsonToModel(object.modelObject, json)
- if not MainConfig.OptimizeAPI:
+ if not config.OptimizeAPI:
DatabaseController().create_CoT(object.modelObject)
object.setXmlString(XMLCoTController().serialize_model_to_CoT(object.modelObject))
self.setCoTObject(object)
diff --git a/FreeTAKServer/controllers/ServerStatusController.py b/FreeTAKServer/controllers/ServerStatusController.py
index ceda995b..37ed8936 100644
--- a/FreeTAKServer/controllers/ServerStatusController.py
+++ b/FreeTAKServer/controllers/ServerStatusController.py
@@ -2,6 +2,9 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
import typing
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
def modifyDefaultIP(func):
def changeDefaultIP(instance, port, ip):
import socket
@@ -101,7 +104,6 @@ def TCPCoTStatusCheck(self, TCPCoTPort, IP):
def SSLDataPackageStatusCheck(self, SSLDataPackagePort, IP):
import requests
- from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
try:
import socket
if IP == "0.0.0.0":
@@ -110,7 +112,7 @@ def SSLDataPackageStatusCheck(self, SSLDataPackagePort, IP):
IP = s.getsockname()[0]
else:
pass
- conn = requests.get(f'https://{IP}:{SSLDataPackagePort}/Alive', cert=(MainConfig.pemDir, MainConfig.unencryptedKey), verify=False)
+ conn = requests.get(f'https://{IP}:{SSLDataPackagePort}/Alive', cert=(config.pemDir, config.unencryptedKey), verify=False)
if conn.status_code == 200:
return "on"
else:
diff --git a/FreeTAKServer/controllers/SpecificCoTControllers/SendCoTAbstractController.py b/FreeTAKServer/controllers/SpecificCoTControllers/SendCoTAbstractController.py
index 6f265dab..bc6476af 100644
--- a/FreeTAKServer/controllers/SpecificCoTControllers/SendCoTAbstractController.py
+++ b/FreeTAKServer/controllers/SpecificCoTControllers/SendCoTAbstractController.py
@@ -5,14 +5,16 @@
from abc import ABC
from FreeTAKServer.controllers.serializers import xml_serializer
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loggingConstants = LoggingConstants(log_name="FTS_SendCoTAbstract")
logger = CreateLoggerController("FTS_SendCoTAbstract", logging_constants=loggingConstants).getLogger()
-
class SendCoTAbstractController(ABC):
Event = event
- def fill_object(self, object, tempObject, RawCoT, addToDB=MainConfig.SaveCoTToDB):
+ def fill_object(self, object, tempObject, RawCoT, addToDB=config.SaveCoTToDB):
try:
object.modelObject = self.create_model_object(tempObject, RawCoT.xmlString)
diff --git a/FreeTAKServer/controllers/SpecificCoTControllers/SendOtherController.py b/FreeTAKServer/controllers/SpecificCoTControllers/SendOtherController.py
index ed1a2b29..d418710c 100644
--- a/FreeTAKServer/controllers/SpecificCoTControllers/SendOtherController.py
+++ b/FreeTAKServer/controllers/SpecificCoTControllers/SendOtherController.py
@@ -5,12 +5,14 @@
from FreeTAKServer.controllers.CreateLoggerController import CreateLoggerController
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loggingConstants = LoggingConstants()
logger = CreateLoggerController("SendOtherController").getLogger()
-
class SendOtherController(SendCoTAbstractController):
- def __init__(self, RawCoT=None, addToDB=MainConfig.SaveCoTToDB):
+ def __init__(self, RawCoT=None, addToDB=config.SaveCoTToDB):
if type(RawCoT != bytes):
pass
else:
diff --git a/FreeTAKServer/controllers/certificate_generation.py b/FreeTAKServer/controllers/certificate_generation.py
index 8bd93b34..3687ffde 100644
--- a/FreeTAKServer/controllers/certificate_generation.py
+++ b/FreeTAKServer/controllers/certificate_generation.py
@@ -24,6 +24,8 @@
subprocess.run(["pip3", "install", "requests"], capture_output=True)
import hashlib
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
def _utc_time_from_datetime(date):
fmt = '%y%m%d%H%M'
@@ -36,7 +38,7 @@ def _utc_time_from_datetime(date):
return date.strftime(fmt)
-def revoke_certificate(username, revoked_file=None, ca_pem = MainConfig.CA, ca_key = MainConfig.CAkey, crl_file = MainConfig.CRLFile, user_cert_dir=MainConfig.certsPath, crl_path=MainConfig.CRLFile):
+def revoke_certificate(username, revoked_file=None, ca_pem = config.CA, ca_key = config.CAkey, crl_file = config.CRLFile, user_cert_dir=config.certsPath, crl_path=config.CRLFile):
"""
Function to create/update a CRL with revoked user certificates
:param ca_pem: The path to your CA PEM file
@@ -53,6 +55,7 @@ def revoke_certificate(username, revoked_file=None, ca_pem = MainConfig.CA, ca_k
import json
from OpenSSL import crypto
from datetime import datetime
+
data = {}
certificate = crypto.load_certificate(crypto.FILETYPE_PEM, open(ca_pem, mode="rb").read())
private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, open(ca_key, mode="r").read())
@@ -66,7 +69,7 @@ def revoke_certificate(username, revoked_file=None, ca_pem = MainConfig.CA, ca_k
for cert in os.listdir(user_cert_dir):
if cert.lower() == f"{username.lower()}.pem":
- with open(MainConfig.certsPath+'/'+cert, 'rb') as cert:
+ with open(config.certsPath+'/'+cert, 'rb') as cert:
revoked_cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert.read())
data[str(revoked_cert.get_serial_number())] = username
break
@@ -131,7 +134,7 @@ def send_data_package(server: str, dp_name: str = "user.zip") -> bool:
return False
def generate_standard_zip(server_address: str = None, server_filename: str = "server.p12", user_filename: str = "Client.p12",
- cert_password: str = MainConfig.password, ssl_port: str = "8089") -> None:
+ cert_password: str = config.password, ssl_port: str = "8089") -> None:
"""
A Function to generate a Client connection Data Package (DP) from a server and user p12 file in the current
working directory.
@@ -168,7 +171,7 @@ def generate_standard_zip(server_address: str = None, server_filename: str = "se
-
+
""")
@@ -176,23 +179,23 @@ def generate_standard_zip(server_address: str = None, server_filename: str = "se
username = user_filename[:-4]
random_id = uuid.uuid4()
parentfolder = "80b828699e074a239066d454a76284eb"
- if MainConfig.UserConnectionIP == "0.0.0.0":
+ if config.UserConnectionIP == "0.0.0.0":
hostname = socket.gethostname()
server_address = socket.gethostbyname(hostname)
else:
- server_address = MainConfig.UserConnectionIP
+ server_address = config.UserConnectionIP
pref = pref_file_template.render(server=server_address, server_filename=server_filename,
user_filename=user_filename, cert_password=cert_password,
- port=str(MainConfig.SSLCoTServicePort))
+ port=str(config.SSLCoTServicePort))
man = manifest_file_template.render(uid=random_id, server=server_address, server_filename=server_filename,
user_filename=user_filename)
with open('fts.pref', 'w') as pref_file:
pref_file.write(pref)
with open('manifest.xml', 'w') as manifest_file:
manifest_file.write(man)
- copyfile(MainConfig.p12Dir, server_filename)
- copyfile(pathlib.Path(MainConfig.certsPath, user_filename), pathlib.Path(user_filename))
- zipf = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(MainConfig.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
+ copyfile(config.p12Dir, server_filename)
+ copyfile(pathlib.Path(config.certsPath, user_filename), pathlib.Path(user_filename))
+ zipf = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(config.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
zipf.write('fts.pref')
zipf.write('manifest.xml')
zipf.write(user_filename)
@@ -202,7 +205,7 @@ def generate_standard_zip(server_address: str = None, server_filename: str = "se
os.remove('manifest.xml')
def generate_wintak_zip(server_address: str = None, server_filename: str = "server.p12", user_filename: str = "Client.p12",
- cert_password: str = MainConfig.password, ssl_port: str = "8089") -> None:
+ cert_password: str = config.password, ssl_port: str = "8089") -> None:
"""
A Function to generate a Client connection Data Package (DP) from a server and user p12 file in the current
working directory.
@@ -239,7 +242,7 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
-
+
""")
@@ -259,14 +262,14 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
new_uid = uuid.uuid4()
folder = "5c2bfcae3d98c9f4d262172df99ebac5"
parentfolder = "80b828699e074a239066d454a76284eb"
- if MainConfig.UserConnectionIP == "0.0.0.0":
+ if config.UserConnectionIP == "0.0.0.0":
hostname = socket.gethostname()
server_address = socket.gethostbyname(hostname)
else:
- server_address = MainConfig.UserConnectionIP
+ server_address = config.UserConnectionIP
pref = pref_file_template.render(server=server_address, server_filename=server_filename,
user_filename=user_filename, cert_password=cert_password,
- port=str(MainConfig.SSLCoTServicePort))
+ port=str(config.SSLCoTServicePort))
man = manifest_file_template.render(uid=random_id, server=server_address, server_filename=server_filename,
user_filename=user_filename, folder=folder)
man_parent = manifest_file_parent_template.render(uid=new_uid, server=server_address,
@@ -280,8 +283,8 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
pref_file.write(pref)
with open('./MANIFEST/manifest.xml', 'w') as manifest_file:
manifest_file.write(man)
- copyfile(MainConfig.p12Dir, "./" + folder + "/" + server_filename)
- copyfile(pathlib.Path(MainConfig.certsPath, user_filename), pathlib.Path(folder, user_filename))
+ copyfile(config.p12Dir, "./" + folder + "/" + server_filename)
+ copyfile(pathlib.Path(config.certsPath, user_filename), pathlib.Path(folder, user_filename))
zipf = zipfile.ZipFile(f"{username}.zip", 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk('./' + folder):
for file in files:
@@ -300,7 +303,7 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
with open('./MANIFEST/manifest.xml', 'w') as manifest_parent:
manifest_parent.write(man_parent)
copyfile(f"{username}.zip", pathlib.Path(parentfolder, f"{username}.zip"))
- zipp = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(MainConfig.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
+ zipp = zipfile.ZipFile(str(pathlib.PurePath(pathlib.Path(config.clientPackages), pathlib.Path(f"{username}.zip"))), 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk('./' + parentfolder):
for file in files:
name = str(pathlib.PurePath(pathlib.Path(root), pathlib.Path(file)))
@@ -315,14 +318,14 @@ def generate_wintak_zip(server_address: str = None, server_filename: str = "serv
class AtakOfTheCerts:
- def __init__(self, pwd: str = MainConfig.password) -> None:
+ def __init__(self, pwd: str = config.password) -> None:
"""
:param pwd: String based password used to secure the p12 files generated, defaults to MainConfig.password
"""
self.key = crypto.PKey()
self.CERTPWD = pwd
- self.cakeypath = MainConfig.CAkey
- self.capempath = MainConfig.CA
+ self.cakeypath = config.CAkey
+ self.capempath = config.CA
def __enter__(self):
return self
@@ -366,7 +369,7 @@ def generate_ca(self, expiry_time_secs: int = 31536000) -> None:
crl = crypto.CRL()
crl.sign(cert, ca_key, b"sha256")
- with open(MainConfig.CRLFile, 'wb') as f:
+ with open(config.CRLFile, 'wb') as f:
f.write(crl.export(cert=cert, key=ca_key, digest=b"sha256"))
delete = 0
@@ -401,7 +404,7 @@ def _generate_key(self, keypath: str) -> None:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key))
f.close()
- def _generate_certificate(self, common_name: str, p12path: str, pempath: str = MainConfig.pemDir,
+ def _generate_certificate(self, common_name: str, p12path: str, pempath: str = config.pemDir,
expiry_time_secs: int = 31536000) -> None:
"""
Create a certificate and p12 file
@@ -447,9 +450,9 @@ def bake(self, common_name: str, cert: str = "user", expiry_time_secs: int = 315
:param cert: Type of cert being created "user" or "server"
:param expiry_time_secs: length of time in seconds that the certificate is valid for, defaults to 1 year
"""
- keypath = pathlib.Path(MainConfig.certsPath,f"{common_name}.key")
- pempath = pathlib.Path(MainConfig.certsPath,f"{common_name}.pem")
- p12path = pathlib.Path(MainConfig.certsPath,f"{common_name}.p12")
+ keypath = pathlib.Path(config.certsPath,f"{common_name}.key")
+ pempath = pathlib.Path(config.certsPath,f"{common_name}.pem")
+ p12path = pathlib.Path(config.certsPath,f"{common_name}.p12")
self._generate_key(keypath)
self._generate_certificate(common_name=common_name, pempath=pempath, p12path=p12path, expiry_time_secs=expiry_time_secs)
if cert.lower() == "server":
@@ -472,9 +475,9 @@ def copy_server_certs(server_name: str = "server") -> None:
return None
if not os.path.exists(dest + "/Certs"):
os.makedirs(dest + "/Certs")"""
- copyfile("./" + server_name + ".key", MainConfig.keyDir)
- copyfile("./" + server_name + ".key", MainConfig.unencryptedKey)
- copyfile("./" + server_name + ".pem", MainConfig.pemDir)
+ copyfile("./" + server_name + ".key", config.keyDir)
+ copyfile("./" + server_name + ".key", config.unencryptedKey)
+ copyfile("./" + server_name + ".pem", config.pemDir)
def generate_auto_certs(self, ip: str, copy: bool = False, expiry_time_secs: int = 31536000, wintak_zip=False) -> None:
"""
diff --git a/FreeTAKServer/controllers/configuration/DataPackageServerConstants.py b/FreeTAKServer/controllers/configuration/DataPackageServerConstants.py
index 330d5965..a41b39c2 100644
--- a/FreeTAKServer/controllers/configuration/DataPackageServerConstants.py
+++ b/FreeTAKServer/controllers/configuration/DataPackageServerConstants.py
@@ -2,6 +2,9 @@
import pathlib
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class DataPackageServerConstants:
def __init__(self):
# http server config
@@ -15,7 +18,7 @@ def __init__(self):
self.HTTPDEBUG = False
self.HTTPMETHODS = ['POST', 'GET', 'PUT']
self.IP = "0.0.0.0"
- self.versionInfo = MainConfig.version
+ self.versionInfo = config.version
self.NodeID = 'FTS'
self.VERSIONJSON = '{"version":"2","type":"ServerConfig", "data":{"version": "%s", "api": "2","hostname":"%s"}, "nodeId":"%s"}' % (
self.versionInfo, "0.0.0.0", self.NodeID)
diff --git a/FreeTAKServer/controllers/configuration/DatabaseConfiguration.py b/FreeTAKServer/controllers/configuration/DatabaseConfiguration.py
index 5359a5a3..7f701914 100644
--- a/FreeTAKServer/controllers/configuration/DatabaseConfiguration.py
+++ b/FreeTAKServer/controllers/configuration/DatabaseConfiguration.py
@@ -1,9 +1,12 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class DatabaseConfiguration:
- DataBasePath = MainConfig.DBFilePath
- if MainConfig.DataBaseType == "SQLite":
+ DataBasePath = config.DBFilePath
+ if config.DataBaseType == "SQLite":
DataBaseType = str('sqlite:///')
- elif MainConfig.DataBaseType == "MySQL":
+ elif config.DataBaseType == "MySQL":
DataBaseType = str('mysql://')
DataBaseConnectionString = str(DataBaseType+DataBasePath)
diff --git a/FreeTAKServer/controllers/configuration/MainConfig.py b/FreeTAKServer/controllers/configuration/MainConfig.py
index 35d1f1b4..d1d5a5e9 100644
--- a/FreeTAKServer/controllers/configuration/MainConfig.py
+++ b/FreeTAKServer/controllers/configuration/MainConfig.py
@@ -1,12 +1,17 @@
import os
-import random
import re
+import yaml
+currentPath = os.path.dirname(os.path.abspath(__file__))
from pathlib import Path
-from string import ascii_letters, digits, punctuation
from uuid import uuid4
-import yaml
-
+# the version information of the server (recommended to leave as default)
+FTS_VERSION = 'FreeTAKServer-1.9.9.6 Public'
+API_VERSION = '1.9.5'
+# TODO Need to find a better way to determine python version at runtime
+PYTHON_VERSION = 'python3.8'
+USERPATH = '/usr/local/lib/'
+MAINPATH = fr'{USERPATH}{PYTHON_VERSION}/dist-packages/FreeTAKServer'
class MainConfig:
"""
@@ -14,199 +19,312 @@ class MainConfig:
should need to be changed
"""
- # the version information of the server (recommended to leave as default)
- version = 'FreeTAKServer-1.9.9.6 Public'
-
- yaml_path = str(os.environ.get('FTS_CONFIG_PATH', '/opt/FTSConfig.yaml'))
- python_version = 'python3.8'
- userpath = '/usr/local/lib/'
+ _instance = None
+ _values = {}
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
- ip = s.getsockname()[0]
+ _ip = s.getsockname()[0]
s.close()
except:
- ip = "0.0.0.0"
-
- ## The following are *all* the attributes supported by this class
- ## and their default values
-
- ## Network settings ##
- # Client ports
- CoTServicePort = 8087
- SSLCoTServicePort = 8089
-
- # this needs to be changed for private data packages to work
- DataPackageServiceDefaultIP = ip
-
- # User Connection package IP needs to be set to the IP which is used when creating the connection in your tak device
- UserConnectionIP = ip
-
- # API IP and port
- APIIP = '0.0.0.0'
- APIPort = 19023
-
- # Federation port
- FederationPort = 9000
-
- # allowed ip's to access CLI commands
- AllowedCLIIPs = ['127.0.0.1']
-
- # IP for CLI to access
- CLIIP = '127.0.0.1'
-
- ## API Settings ##
- APIVersion = "1.9.5"
- nodeID = f"FreeTAKServer-{uuid4()}"
- OptimizeAPI = True
- DataReceptionBuffer = 1024
- MaxReceptionTime = 4
-
- # number of milliseconds to wait between each iteration of main loop
- # decreasing will increase CPU usage and server performance
- # increasing will decrease CPU usage and server performance
- MainLoopDelay = 100
-
- # set to None if you don't want a message sent
- ConnectionMessage = f'Welcome to FreeTAKServer {version}. The Parrot is not dead. It’s just resting'
-
- ## Database Settings ##
- # whether or not to save CoT's to the DB
- SaveCoTToDB = True
-
- # this should be set before startup
- DBFilePath = '/opt/FTSDataBase.db'
- DataBaseType = 'SQLite'
-
- ## Paths ##
- MainPath = Path(fr'{userpath}{python_version}/dist-packages/FreeTAKServer')
- ExCheckMainPath = Path(fr'{MainPath}/ExCheck')
- ExCheckFilePath = Path(fr'{MainPath}/ExCheck/template')
- ExCheckChecklistFilePath = Path(fr'{MainPath}/ExCheck/checklist')
- DataPackageFilePath = Path(fr'{MainPath}/FreeTAKServerDataPackageFolder')
- LogFilePath = Path(fr'{MainPath}/Logs')
- certsPath = Path(fr'{MainPath}/certs')
- # location to backup client packages
- clientPackages = str(Path(fr'{MainPath}/certs/ClientPackages'))
-
- ## Certificate Settings ##
- keyDir = Path(fr'{certsPath}/server.key')
- pemDir = Path(fr'{certsPath}/server.pem') # or crt
- testPem = ''
- testKey = ''
- unencryptedKey = Path(fr'{certsPath}/server.key.unencrypted')
- p12Dir = Path(fr'{certsPath}/server.p12')
- CA = Path(fr'{certsPath}/ca.pem')
- CAkey = Path(fr'{certsPath}/ca.key')
- CRLFile = Path(fr"{certsPath}/FTS_CRL.json")
-
- ## Federation Settings ##
- federationCert = Path(fr'{certsPath}/server.pem')
- federationKey = Path(fr'{certsPath}/server.key')
- federationKeyPassword = 'defaultpass'
-
- ## Secrets ##
- password = 'supersecret'
- websocketkey = "YourWebsocketKey"
- SecretKey = ''.join(random.choice(ascii_letters + digits + punctuation) for i in range(18))
-
- # Overlay the settings from the YAML config if it exists
- if os.path.exists(yaml_path):
+ _ip = "0.0.0.0"
+
+ _node_id = str(uuid4())
+
+ # All available config vars should be defined here
+ # currently only 'default', 'type' and 'readonly' are recognized
+ _defaults = {
+ 'version': {'default': FTS_VERSION, 'type': str, 'readonly': True},
+ 'APIVersion': {'default': API_VERSION, 'type': str, 'readonly': True},
+ 'SecretKey': {'default': 'vnkdjnfjknfl1232#', 'type': str},
+ 'nodeID': {'default': f'FreeTAKServer-{_node_id}', 'type': str},
+ 'OptimizeAPI': {'default': True, 'type': bool},
+ 'DataReceptionBuffer': {'default': 1024, 'type': int},
+ 'MaxReceptionTime': {'default': 4, 'type': int},
+ # number of milliseconds to wait between each iteration of main loop
+ # decreasing will increase CPU usage and server performance
+ # increasing will decrease CPU usage and server performance
+ 'MainLoopDelay': {'default': 100, 'type': int},
+ # this is the port to which clients will connect
+ 'CoTServicePort': {'default': 8087, 'type': int},
+ 'SSLCoTServicePort': {'default': 8089, 'type': int},
+ # this needs to be changed for private data packages to work
+ 'DataPackageServiceDefaultIP': {'default': _ip, 'type': str},
+ # User Connection package IP needs to be set to the IP which is used when creating the connection in your tak device
+ 'UserConnectionIP': {'default': _ip, 'type': str},
+ # api port
+ 'APIPort': {'default': 19023, 'type': int},
+ # Federation port
+ 'FederationPort': {'default': 9000, 'type': int},
+ # api IP
+ 'APIIP': {'default': '0.0.0.0', 'type': str},
+ # IP for CLI to access
+ 'CLIIP': {'default': '127.0.0.1', 'type': str},
+ 'AllowCLIIPs': {'default': ['127.0.0.1'], 'type': list},
+ # whether or not to save CoT's to the DB
+ 'SaveCoTToDB': {'default': True, 'type': bool},
+ # this should be set before startup
+ 'DBFilePath': {'default': r'/opt/FTSDataBase.db', 'type': str},
+ 'MainPath': {'default': Path(MAINPATH), 'type': str},
+ 'certsPath': {'default': Path(fr'{MAINPATH}/certs'), 'type': str},
+ 'ExCheckMainPath': {'default': Path(fr'{MAINPATH}/ExCheck'), 'type': str},
+ 'ExCheckFilePath': {'default': Path(fr'{MAINPATH}/ExCheck/template'), 'type': str},
+ 'ExCheckChecklistFilePath': {'default': Path(fr'{MAINPATH}/ExCheck/checklist'), 'type': str},
+ 'DataPackageFilePath': {'default': Path(fr'{MAINPATH}/FreeTAKServerDataPackageFolder'), 'type': str},
+ 'LogFilePath': {'default': Path(fr"{MAINPATH}/Logs"), 'type': str},
+ 'federationKeyPassword': {'default': 'defaultpass', 'type': str},
+ 'keyDir': {'default': Path(fr'{MAINPATH}/certs/server.key'), 'type': str},
+ 'pemDir': {'default': Path(fr'{MAINPATH}/certs/server.pem'), 'type': str},
+ 'testPem': {'default': Path(fr'{MAINPATH}/certs/server.key'), 'type': str},
+ 'testKey': {'default': Path(fr'{MAINPATH}/certs/server.pem'), 'type': str},
+ 'unencryptedKey': {'default': Path(fr'{MAINPATH}/certs/server.key.unencrypted'), 'type': str},
+ 'p12Dir': {'default': Path(fr'{MAINPATH}/certs/server.p12'), 'type': str},
+ 'CA': {'default': Path(fr'{MAINPATH}/certs/ca.pem'), 'type': str},
+ 'CAkey': {'default': Path(fr'{MAINPATH}/certs/ca.key'), 'type': str},
+ 'federationCert': {'default': Path(fr'{MAINPATH}/certs/server.pem'), 'type': str},
+ 'federationKey': {'default': Path(fr'{MAINPATH}/certs/server.key'), 'type': str},
+ 'federationKeyPassword': {'default': 'defaultpass', 'type': str},
+ 'password': {'default': 'supersecret', 'type': str},
+ 'websocketkey': {'default': "YourWebsocketKey", 'type': str},
+ 'CRLFile': {'default': Path(fr"{MAINPATH}/certs/FTS_CRL.json"), 'type': str},
+ # set to None if you don't want a message sent
+ 'ConnectionMessage': {'default': f'Welcome to FreeTAKServer {FTS_VERSION}. The Parrot is not dead. It’s just resting', 'type': str},
+ 'DataBaseType': {'default': "SQLite", 'type': str},
+ # location to backup client packages
+ 'clientPackages': {'default': Path(fr'{MAINPATH}/certs/clientPackages'), 'type': str},
+ }
+
+ # This structure maps environmental vars to config vars
+ _env_vars = {
+ 'FTS_SECRET_KEY': 'SecretKey',
+ 'FTS_NODE_ID': 'nodeID',
+ 'FTS_OPTIMIZE_API': 'OptimizeAPI',
+ 'FTS_DATA_RECEPTION_BUFFER': 'DataReceptionBuffer',
+ 'FTS_MAX_RECEPTION_TIME': 'MaxReceptionTime',
+ 'FTS_MAINLOOP_DELAY': 'MainLoopDelay',
+ 'FTS_COT_PORT': 'CoTServicePort',
+ 'FTS_SSLCOT_PORT': 'SSLCoTServicePort',
+ 'FTS_DP_ADDRESS': 'DataPackageServiceDefaultIP',
+ 'FTS_USER_ADDRESS': 'UserConnectionIP',
+ 'FTS_API_PORT': 'APIPort',
+ 'FTS_CLI_WHITELIST': 'AllowCLIIPs',
+ 'FTS_FED_PORT': 'FederationPort',
+ 'FTS_API_ADDRESS': 'APIIP',
+ 'FTS_COT_TO_DB': 'SaveCoTToDB',
+ 'FTS_DB_PATH': 'DBFilePath',
+ 'FTS_MAINPATH': 'MainPath',
+ 'FTS_CERTS_PATH': 'certsPath',
+ 'FTS_EXCHECK_PATH': 'ExCheckMainPath',
+ 'FTS_EXCHECK_TEMPLATE_PATH': 'ExCheckFilePath',
+ 'FTS_EXCHECK_CHECKLIST_PATH': 'ExCheckChecklistFilePath',
+ 'FTS_DATAPACKAGE_PATH': 'DataPackageFilePath',
+ 'FTS_LOGFILE_PATH': 'LogFilePath',
+ 'FTS_FED_PASSWORD': 'federationKeyPassword',
+ 'FTS_SERVER_KEYDIR': 'keyDir',
+ 'FTS_SERVER_PEMDIR': 'pemDir',
+ 'FTS_TESTCLIENT_PEMDIR': 'testPem',
+ 'FTS_TESTCLIENT_KEYDIR': 'testKey',
+ 'FTS_UNENCRYPTED_KEYDIR': 'unencryptedKey',
+ 'FTS_SERVER_P12DIR': 'p12Dir',
+ 'FTS_CADIR': 'CA',
+ 'FTS_CAKEYDIR': 'CAkey',
+ 'FTS_FEDERATION_CERTDIR': 'federationCert',
+ 'FTS_FEDERATION_KEYDIR': 'federationKey',
+ 'FTS_FEDERATION_KEYPASS': 'federationKeyPassword',
+ 'FTS_CLIENT_CERT_PASSWORD': 'password',
+ 'FTS_WEBSOCKET_KEY': 'websocketkey',
+ 'FTS_CRLDIR': 'CRLFile',
+ 'FTS_CONNECTION_MESSAGE': 'ConnectionMessage',
+ 'FTS_DATABASE_TYPE': 'DataBaseType',
+ 'FTS_CLIENT_PACKAGES': 'clientPackages',
+ }
+
+ # This is a simple representation of the YAML config schema with
+ # mappings to config var
+ _yaml_keys = {
+ 'System': {
+ 'FTS_NODE_ID': 'nodeID',
+ 'FTS_MAINLOOP_DELAY': 'MainLoopDelay',
+ 'FTS_CONNECTION_MESSAGE': 'ConnectionMessage',
+ 'FTS_DATABASE_TYPE': 'DataBaseType',
+ 'FTS_OPTIMIZE_API': 'OptimizeAPI',
+ 'FTS_SECRET_KEY': 'SecretKey',
+ 'FTS_DATA_RECEPTION_BUFFER': 'DataReceptionBuffer',
+ 'FTS_MAX_RECEPTION_TIME': 'MaxReceptionTime',
+ },
+ 'Addresses': {
+ 'FTS_COT_PORT': 'CoTServicePort',
+ 'FTS_SSLCOT_PORT': 'SSLCoTServicePort',
+ 'FTS_DP_ADDRESS': 'DataPackageServiceDefaultIP',
+ 'FTS_USER_ADDRESS': 'UserConnectionIP',
+ 'FTS_API_PORT': 'APIPort',
+ 'FTS_FED_PORT': 'FederationPort',
+ 'FTS_API_ADDRESS': 'APIIP',
+ 'FTS_CLI_WHITELIST': 'AllowCLIIPs',
+ },
+ 'Filesystem': {
+ 'FTS_COT_TO_DB': 'SaveCoTToDB',
+ 'FTS_DB_PATH': 'DBFilePath',
+ 'FTS_MAINPATH': 'MainPath',
+ 'FTS_CERTS_PATH': 'certsPath',
+ 'FTS_EXCHECK_PATH': 'ExCheckMainPath',
+ 'FTS_EXCHECK_TEMPLATE_PATH': 'ExCheckFilePath',
+ 'FTS_EXCHECK_CHECKLIST_PATH': 'ExCheckChecklistFilePath',
+ 'FTS_DATAPACKAGE_PATH': 'DataPackageFilePath',
+ 'FTS_LOGFILE_PATH': 'LogFilePath',
+ 'FTS_CLIENT_PACKAGES': 'clientPackages',
+ },
+ 'Certs': {
+ 'FTS_SERVER_KEYDIR': 'keyDir',
+ 'FTS_SERVER_PEMDIR': 'pemDir',
+ 'FTS_TESTCLIENT_PEMDIR': 'testPem',
+ 'FTS_TESTCLIENT_KEYDIR': 'testKey',
+ 'FTS_UNENCRYPTED_KEYDIR': 'unencryptedKey',
+ 'FTS_SERVER_P12DIR': 'p12Dir',
+ 'FTS_CADIR': 'CA',
+ 'FTS_CAKEYDIR': 'CAkey',
+ 'FTS_FEDERATION_CERTDIR': 'federationCert',
+ 'FTS_FEDERATION_KEYDIR': 'federationKey',
+ 'FTS_FEDERATION_KEYPASS': 'federationKeyPassword',
+ 'FTS_CLIENT_CERT_PASSWORD': 'password',
+ 'FTS_WEBSOCKET_KEY': 'websocketkey',
+ 'FTS_CRLDIR': 'CRLFile',
+ }
+ }
+
+ def __init__(self):
+ raise RuntimeError('Call instance() instead')
+
+ # instance() is the normal entry point to get access to config information.
+ #
+ # Generally it is called without arguments in the rest of the FTS code
+ # and the return value is a config object where the config vars can be
+ # read or written using the attribute syntax (e.g. config.var for read and
+ # config.var = value for write), the dictionary syntax (e.g. config['var']
+ # for read and config['var'] = value for write) or the get() and set() methods.
+ #
+ # The only time that instance() is called with a parameter is the first
+ # time it is accessed so that the YAML config file can be read in and
+ # parsed. Further calls to instance() with the YAML configuration param
+ # may reset values of current config vars to their start values.
+ @classmethod
+ def instance(cls, config_file=None):
+ if cls._instance is None:
+ cls._instance = cls.__new__(cls)
+ # Put any initialization here.
+
+ # preload the defaults into the _values table
+
+ for var_name, metadata in cls._defaults.items():
+ cls._instance.set(var_name, value=metadata['default'],
+ override_ro=True)
+
+ # if config_file not specified, check env or use default location
+ if config_file == None:
+ config_file = str(os.environ.get('FTS_CONFIG_PATH', '/opt/FTSConfig.yaml'))
+
+ # overlay the yaml config if found
+ if os.path.exists(config_file):
+ cls._instance.read_yaml_config(config_file)
+
+ # finally overlay any configuration specified in the env
+ cls._instance.import_env_config()
+
+ return cls._instance
+
+ # reset() is really only used for testing so that MainConfig can
+ # be reinitialized between tests. It normally should not be used
+ # by anything else.
+ @classmethod
+ def reset(cls):
+ # here we need to reinitialze all the private vars
+ cls._instance = None
+ cls._values = {}
+ cls._node_id = str(uuid4())
+
+ def set(self, name, value=None, override_ro=False):
+ # add the value to the values table using the correct type
+ if not self._readonly(name) or override_ro:
+ self._values[name] = self._var_type(name)(value)
+
+ def get(self, name):
+ if name in self._values:
+ return self._values[name]
+ else:
+ raise RuntimeError(f'MainConfig unknown setting name: {name}')
+
+ # read_yaml_config() will parse a YAML config and apply to the current
+ # config vars. This should only be called from instance() and only
+ # at the startup of the FTS server.
+ def read_yaml_config(self, yaml_path):
try:
- yamlConfig = yaml.safe_load(open(yaml_path).read())
+ content = open(yaml_path).read()
+ yamlConfig = yaml.safe_load(content)
except OSError as e:
- raise RuntimeError(f"{yaml_path} exists, but can not be read")
-
- if yamlConfig.get("System"):
- MainLoopDelay = int(yamlConfig["System"].get("FTS_MAINLOOP_DELAY", MainLoopDelay))
- ConnectionMessage = yamlConfig["System"].get("FTS_CONNECTION_MESSAGE", ConnectionMessage)
- DataBaseType = yamlConfig["System"].get("FTS_DATABASE_TYPE", DataBaseType)
- OptimizeAPI = bool(yamlConfig["System"].get("FTS_OPTIMIZE_API", OptimizeAPI))
- SecretKey = yamlConfig["System"].get("FTS_SECRET_KEY", SecretKey)
- DataReceptionBuffer = int(yamlConfig["System"].get("FTS_DATA_RECEPTION_BUFFER", DataReceptionBuffer))
- MaxReceptionTime = int(yamlConfig["System"].get("FTS_MAX_RECEPTION_TIME", MaxReceptionTime))
- nodeID = yamlConfig["System"].get("FTS_NODE_ID", nodeID)
-
- if yamlConfig.get("Addresses"):
- CoTServicePort = int(yamlConfig["Addresses"].get('FTS_COT_PORT', CoTServicePort))
- SSLCoTServicePort = int(yamlConfig["Addresses"].get('FTS_SSLCOT_PORT', SSLCoTServicePort))
- DataPackageServiceDefaultIP = yamlConfig["Addresses"].get('FTS_DP_ADDRESS', DataPackageServiceDefaultIP)
- UserConnectionIP = yamlConfig["Addresses"].get("FTS_USER_ADDRESS", UserConnectionIP)
- APIPort = int(yamlConfig["Addresses"].get("FTS_API_PORT", APIPort))
- APIIP = yamlConfig["Addresses"].get("FTS_API_ADDRESS", APIIP)
- FederationPort = int(yamlConfig["Addresses"].get("FTS_FED_PORT", FederationPort))
- AllowedCLIIPs = yamlConfig["Addresses"].get("FTS_CLI_WHITELIST", AllowedCLIIPs)
- CLIIP = yamlConfig["Addresses"].get("FTS_CLI_IP", CLIIP)
-
- if yamlConfig.get("FileSystem"):
- DBFilePath = yamlConfig["FileSystem"].get("FTS_DB_PATH", DBFilePath)
- SaveCoTToDB = bool(yamlConfig["FileSystem"].get("FTS_COT_TO_DB", SaveCoTToDB))
- MainPath = yamlConfig["FileSystem"].get("FTS_MAINPATH", MainPath)
- certsPath = yamlConfig["FileSystem"].get("FTS_CERTS_PATH", certsPath)
- ExCheckMainPath = yamlConfig["FileSystem"].get("FTS_EXCHECK_PATH", ExCheckMainPath)
- ExCheckFilePath = yamlConfig["FileSystem"].get("FTS_EXCHECK_TEMPLATE_PATH", ExCheckFilePath)
- ExCheckChecklistFilePath = yamlConfig["FileSystem"].get("FTS_EXCHECK_CHECKLIST_PATH", ExCheckChecklistFilePath)
- DataPackageFilePath = yamlConfig["FileSystem"].get("FTS_DATAPACKAGE_PATH", DataPackageFilePath)
- LogFilePath = yamlConfig["FileSystem"].get("FTS_LOGFILE_PATH", LogFilePath)
-
- if yamlConfig.get("Certs"):
- keyDir = yamlConfig["Certs"].get("FTS_SERVER_KEYDIR", keyDir)
- pemDir = yamlConfig["Certs"].get("FTS_SERVER_PEMDIR", pemDir)
- testPem = yamlConfig["Certs"].get("FTS_TESTCLIENT_PEMDIR", testPem)
- testKey = yamlConfig["Certs"].get("FTS_TESTCLIENT_KEYDIR", testKey)
- unencryptedKey = yamlConfig["Certs"].get("FTS_UNENCRYPTED_KEYDIR", unencryptedKey)
- p12Dir = yamlConfig["Certs"].get("FTS_SERVER_P12DIR", p12Dir)
- CA = yamlConfig["Certs"].get("FTS_CADIR", CA)
- CAkey = yamlConfig["Certs"].get("FTS_CAKEYDIR", CAkey)
- federationCert = yamlConfig["Certs"].get("FTS_FEDERATION_CERTDIR", federationCert)
- federationKey = yamlConfig["Certs"].get("FTS_FEDERATION_KEYDIR", federationKey)
- federationKeyPassword = yamlConfig["Certs"].get("FTS_FEDERATION_KEYPASS", federationKeyPassword)
- password = yamlConfig["Certs"].get("FTS_CLIENT_CERT_PASSWORD", password)
- websocketkey = yamlConfig["Certs"].get("FTS_WEBSOCKET_KEY", websocketkey)
- CRLFile = yamlConfig["Certs"].get("FTS_CRLDIR", CRLFile)
-
- # Allow env vars to modify configuration
- MainLoopDelay = int(os.environ.get('FTS_MAINLOOP_DELAY', MainLoopDelay))
- ConnectionMessage = os.environ.get("FTS_CONNECTION_MESSAGE", ConnectionMessage)
- DataBaseType = os.environ.get("FTS_DATABASE_TYPE", DataBaseType)
- OptimizeAPI = bool(os.environ.get("FTS_OPTIMIZE_API", OptimizeAPI))
- SecretKey = os.environ.get("FTS_SECRET_KEY", SecretKey)
- DataReceptionBuffer = int(os.environ.get("FTS_DATA_RECEPTION_BUFFER", DataReceptionBuffer))
- MaxReceptionTime = int(os.environ.get("FTS_MAX_RECEPTION_TIME", MaxReceptionTime))
- nodeID = os.environ.get("FTS_NODE_ID", nodeID)
- CoTServicePort = int(os.environ.get('FTS_COT_PORT', CoTServicePort))
- SSLCoTServicePort = int(os.environ.get('FTS_SSLCOT_PORT', SSLCoTServicePort))
- DataPackageServiceDefaultIP = os.environ.get('FTS_DP_ADDRESS', DataPackageServiceDefaultIP)
- UserConnectionIP = os.environ.get("FTS_USER_ADDRESS", UserConnectionIP)
- APIPort = int(os.environ.get("FTS_API_PORT", APIPort))
- APIIP = os.environ.get("FTS_API_ADDRESS", APIIP)
- FederationPort = int(os.environ.get("FTS_FED_PORT", FederationPort))
- AllowedCLIIPs = re.split(r'[,:]', os.environ.get("FTS_CLI_WHITELIST", "")) or AllowedCLIIPs
- CLIIP = os.environ.get("FTS_CLI_IP", CLIIP)
- DBFilePath = os.environ.get("FTS_DB_PATH", DBFilePath)
- SaveCoTToDB = bool(os.environ.get("FTS_COT_TO_DB", SaveCoTToDB))
- MainPath = os.environ.get("FTS_MAINPATH", MainPath)
- certsPath = os.environ.get("FTS_CERTS_PATH", certsPath)
- ExCheckMainPath = os.environ.get("FTS_EXCHECK_PATH", ExCheckMainPath)
- ExCheckFilePath = os.environ.get("FTS_EXCHECK_TEMPLATE_PATH", ExCheckFilePath)
- ExCheckChecklistFilePath = os.environ.get("FTS_EXCHECK_CHECKLIST_PATH", ExCheckChecklistFilePath)
- DataPackageFilePath = os.environ.get("FTS_DATAPACKAGE_PATH", DataPackageFilePath)
- LogFilePath = os.environ.get("FTS_LOGFILE_PATH", LogFilePath)
- keyDir = os.environ.get("FTS_SERVER_KEYDIR", keyDir)
- pemDir = os.environ.get("FTS_SERVER_PEMDIR", pemDir)
- testPem = os.environ.get("FTS_TESTCLIENT_PEMDIR", testPem)
- testKey = os.environ.get("FTS_TESTCLIENT_KEYDIR", testKey)
- unencryptedKey = os.environ.get("FTS_UNENCRYPTED_KEYDIR", unencryptedKey)
- p12Dir = os.environ.get("FTS_SERVER_P12DIR", p12Dir)
- CA = os.environ.get("FTS_CADIR", CA)
- CAkey = os.environ.get("FTS_CAKEYDIR", CAkey)
- federationCert = os.environ.get("FTS_FEDERATION_CERTDIR", federationCert)
- federationKey = os.environ.get("FTS_FEDERATION_KEYDIR", federationKey)
- federationKeyPassword = os.environ.get("FTS_FEDERATION_KEYPASS", federationKeyPassword)
- password = os.environ.get("FTS_CLIENT_CERT_PASSWORD", password)
- websocketkey = os.environ.get("FTS_WEBSOCKET_KEY", websocketkey)
- CRLFile = os.environ.get("FTS_CRLDIR", CRLFile)
+ raise e
+
+ # walk through the _yaml_keys struct looking for values in yamlConfig
+ for sect in MainConfig._yaml_keys:
+ if sect in yamlConfig:
+ for attr, var_name in MainConfig._yaml_keys[sect].items():
+ if attr in yamlConfig[sect]:
+ # found a setting we can update the config
+ self.set(var_name, value=yamlConfig[sect][attr])
+
+ # import_env_config() will inspect the current environment and detect
+ # configuration values. Detected values will then be applied to the
+ # current config vars. This should only be called from instance() and
+ # only at the startup of the FTS server.
+ def import_env_config(self):
+ # Walk through all the registered env vars and check to see if the
+ # env var is defined in the environment
+ for env_var, config_var in self._env_vars.items():
+ if env_var in os.environ:
+ env_value = os.environ[env_var]
+
+ # Handle boolean types
+ if self._var_type(config_var) == bool:
+ # bools are actually specified as a string
+ if env_value.upper() in ('1', 'TRUE', 'YES', 'Y'):
+ env_value = True
+ else:
+ env_value = False
+ # Handle lists and split the value apart
+ elif self._var_type(config_var) == list:
+ env_value = re.split(r':|,', env_value)
+
+ self[config_var] = env_value
+
+ # dump_values() is used for debugging and inspecting the current
+ # settings of config vars
+ def dump_values(self):
+ for var_name, value in self._values.items():
+ print(f'{var_name} = {value}')
+
+ # test if the config var should allow being set
+ def _readonly(self, name):
+ if 'readonly' in MainConfig._defaults[name] and MainConfig._defaults[name]['readonly']:
+ return True
+ return False
+
+ # helper function to return the type of a config var
+ def _var_type(self, name):
+ return MainConfig._defaults[name]['type']
+
+ # Attribute access magic methods
+ def __getattr__(self, name):
+ return self.get(name)
+
+ def __setattr__(self, name, value):
+ self.set(name, value)
+
+ # Dictionary access magic methods
+ def __getitem__(self, name):
+ return self.get(name)
+
+ def __setitem__(self, name, value):
+ self.set(name, value)
first_start = True
diff --git a/FreeTAKServer/controllers/configuration/OrchestratorConstants.py b/FreeTAKServer/controllers/configuration/OrchestratorConstants.py
index 997b59db..a05f74f8 100644
--- a/FreeTAKServer/controllers/configuration/OrchestratorConstants.py
+++ b/FreeTAKServer/controllers/configuration/OrchestratorConstants.py
@@ -1,10 +1,13 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class OrchestratorConstants:
def __init__(self):
self.CoTIP = "0.0.0.0"
- self.COTPORT = MainConfig.CoTServicePort
+ self.COTPORT = config.CoTServicePort
self.APIPORTARG = '-APIPort'
self.COTPORTARG = '-CoTPort'
self.IPARG = "-IP"
@@ -18,4 +21,4 @@ def __init__(self):
self.HEALTHCHECK = 'HealthCheck'
self.LOCALHOST = '127.0.0.1'
- self.DEFAULTCONNECTIONGEOCHATOBJ = MainConfig.ConnectionMessage
\ No newline at end of file
+ self.DEFAULTCONNECTIONGEOCHATOBJ = config.ConnectionMessage
\ No newline at end of file
diff --git a/FreeTAKServer/controllers/configuration_wizard.py b/FreeTAKServer/controllers/configuration_wizard.py
index 13f32d17..4076b41a 100644
--- a/FreeTAKServer/controllers/configuration_wizard.py
+++ b/FreeTAKServer/controllers/configuration_wizard.py
@@ -5,6 +5,10 @@
yaml = YAML()
import pathlib
+# TODO This code may have problems with the rewrite of MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
# TODO: refactor this
def get_user_input(*, question: str, default: str = None, options: list = None):
input_string = question
@@ -37,16 +41,16 @@ def add_to_config(path: List[str], data: str, source: dict):
def ask_user_for_config():
use_yaml = get_user_input(question="would you like to use a yaml config file, \n if yes you will be prompted for further configuration options", default="yes")
if use_yaml == "yes":
- yaml_path = get_user_input(question="where would you like to save the yaml config", default=MainConfig.yaml_path)
+ yaml_path = get_user_input(question="where would you like to save the yaml config", default=config.yaml_path)
yaml_config = yaml.load(default_yaml_file)
- ip = get_user_input(question="enter ip", default= MainConfig.ip)
+ ip = get_user_input(question="enter ip", default= config.ip)
add_to_config(data=ip, path=["Addresses", "FTS_DP_ADDRESS"], source=yaml_config)
add_to_config(data=ip, path=["Addresses", "FTS_USER_ADDRESS"], source=yaml_config)
while True:
database = get_user_input(question="enter the preferred database type (MySQL is highly experimental if you're not sure leave default)", default="SQLite")
if database == "SQLite":
- database_path = get_user_input(question="enter the preferred database path", default=MainConfig.DBFilePath)
+ database_path = get_user_input(question="enter the preferred database path", default=config.DBFilePath)
break
elif database == "MySQL":
username = get_user_input(question="please enter MySQL username")
@@ -54,17 +58,17 @@ def ask_user_for_config():
db_ip = get_user_input(question="please enter MySQL server IP")
db_name = get_user_input(question="please enter MySQL DataBase name")
database_path = f"{username}:{password}@{db_ip}/{db_name}"
- MainConfig.DataBaseType = "MySQL"
+ config.DataBaseType = "MySQL"
add_to_config(data=database, path=["System", "FTS_DATABASE_TYPE"], source=yaml_config)
break
else:
print('invalid database type')
- MainConfig.DBFilePath = database_path
+ config.DBFilePath = database_path
add_to_config(data=database_path, path=["FileSystem", "FTS_DB_PATH"], source=yaml_config)
- main_path = get_user_input(question="enter the preferred main_path", default=MainConfig.MainPath)
- MainConfig.MainPath = main_path
+ main_path = get_user_input(question="enter the preferred main_path", default=config.MainPath)
+ config.MainPath = main_path
add_to_config(path=["FileSystem", "FTS_MAINPATH"], data= main_path, source= yaml_config)
- log_path = get_user_input(question="enter the preferred log file path", default=MainConfig.LogFilePath)
+ log_path = get_user_input(question="enter the preferred log file path", default=config.LogFilePath)
add_to_config(path=["FileSystem", "FTS_LOGFILE_PATH"], data=log_path, source=yaml_config)
with open(pathlib.PurePath(pathlib.Path(__file__).parent.resolve().parent,
pathlib.Path("controllers/configuration/MainConfig.py")), mode="r+") as file:
@@ -74,13 +78,13 @@ def ask_user_for_config():
with open(pathlib.PurePath(pathlib.Path(__file__).parent.resolve().parent,
pathlib.Path("controllers/configuration/MainConfig.py")), mode="w+") as file:
file.writelines(data)
- if yaml_path != MainConfig.yaml_path:
+ if yaml_path != config.yaml_path:
file_path = (pathlib.PurePath(pathlib.Path(__file__).parent.resolve().parent, pathlib.Path("controllers/configuration/MainConfig.py")))
file = open(file_path, mode="r+")
data = file.readlines()
data[13] = f' yaml_path = "{yaml_path}"'
file.close()
- MainConfig.yaml_path = yaml_path
+ config.yaml_path = yaml_path
file = open(yaml_path, mode="w+")
yaml.dump(yaml_config, file)
file.close()
@@ -105,7 +109,7 @@ def ask_user_for_config():
default_yaml_file = f"""
System:
#FTS_DATABASE_TYPE: SQLite
- FTS_CONNECTION_MESSAGE: Welcome to FreeTAKServer {MainConfig.version}. The Parrot is not dead. It’s just resting
+ FTS_CONNECTION_MESSAGE: Welcome to FreeTAKServer {config.version}. The Parrot is not dead. It’s just resting
#FTS_OPTIMIZE_API: True
#FTS_MAINLOOP_DELAY: 1
Addresses:
diff --git a/FreeTAKServer/controllers/services/DataPackageServer.py b/FreeTAKServer/controllers/services/DataPackageServer.py
index 6cf43ac1..899387cd 100644
--- a/FreeTAKServer/controllers/services/DataPackageServer.py
+++ b/FreeTAKServer/controllers/services/DataPackageServer.py
@@ -25,6 +25,9 @@
from flask import Flask, request, send_file
from flask.logging import default_handler
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
dbController = DatabaseController()
loggingConstants = LoggingConstants(log_name="FTS-DataPackage_Service")
@@ -43,7 +46,7 @@
app = Flask(__name__) # create the Flask app
app.config['SQLALCHEMY_DATABASE_URI'] = DatabaseConfiguration().DataBaseConnectionString
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
-app.config["SECRET_KEY"] = MainConfig.SecretKey
+app.config["SECRET_KEY"] = config.SecretKey
cors = CORS(app, resources={r"/*": {"origins": "*"}})
app.config['CORS_HEADERS'] = 'Content-Type'
@@ -51,13 +54,16 @@
# TODO: verify session life cycle in dbController doesnt break this logic
dbController.session = db.session
file_dir = os.path.dirname(os.path.realpath(__file__))
-dp_directory = MainConfig.DataPackageFilePath
+dp_directory = config.DataPackageFilePath
-Path(MainConfig.ExCheckMainPath).mkdir(parents=True, exist_ok=True)
+if not os.path.exists(config.ExCheckMainPath):
+ os.mkdir(config.ExCheckMainPath)
-Path(MainConfig.ExCheckChecklistFilePath).mkdir(parents=True, exist_ok=True)
+if not os.path.exists(config.ExCheckChecklistFilePath):
+ os.mkdir(config.ExCheckChecklistFilePath)
-Path(MainConfig.ExCheckFilePath).mkdir(parents=True, exist_ok=True)
+if not os.path.exists(config.ExCheckFilePath):
+ os.mkdir(config.ExCheckFilePath)
# Set up logging
"""if not Path(log.LOGDIRECTORY).exists():
print(f"Creating directory at {log.LOGDIRECTORY}")
@@ -228,9 +234,9 @@ def specificPackage():
if request.method == 'GET' and request.args.get('uid') != None:
data = request.data
taskuid = request.args.get('uid')
- for file in listdir(MainConfig.ExCheckChecklistFilePath):
+ for file in listdir(config.ExCheckChecklistFilePath):
try:
- xml = etree.parse(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(file)))).getroot()
+ xml = etree.parse(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(file)))).getroot()
except Exception as e:
logger.error(e)
tasks = xml.find('checklistTasks')
@@ -240,9 +246,9 @@ def specificPackage():
return etree.tostring(task)
else:
pass
- for file in listdir(MainConfig.ExCheckFilePath):
+ for file in listdir(config.ExCheckFilePath):
try:
- xml = etree.parse(str(PurePath(Path(MainConfig.ExCheckFilePath), Path(file)))).getroot()
+ xml = etree.parse(str(PurePath(Path(config.ExCheckFilePath), Path(file)))).getroot()
if xml.find("checklistDetails").find('uid').text == str(taskuid):
return etree.tostring(xml)
except Exception as e:
@@ -260,7 +266,7 @@ def specificPackage():
return send_file(str(path))
else:
obj = dbController.query_ExCheck(verbose=True, query=f'hash = "{hash}"')
- data = etree.parse(str(PurePath(Path(MainConfig.ExCheckFilePath), Path(obj[0].data.filename))))
+ data = etree.parse(str(PurePath(Path(config.ExCheckFilePath), Path(obj[0].data.filename))))
data.getroot().find('checklistTasks').find("checklistTask").find("uid").text = data.getroot().find(
'checklistTasks').find("checklistTask").find("checklistUid").text
output = etree.tostring(data)
@@ -297,8 +303,6 @@ def home():
from flask import Flask, request
from FreeTAKServer.controllers.ExCheckControllers.templateToJsonSerializer import templateSerializer
from FreeTAKServer.controllers.DatabaseControllers.DatabaseController import DatabaseController
-from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
-
@app.route('/Marti/api/missions/exchecktemplates/changes', methods=['GET'])
def check_changes():
@@ -377,7 +381,7 @@ def template():
tasks = xml.find('checklistTasks')
if not sanitize_path_input(object.data.uid):
return "invalid uid sent", 500
- path = str(PurePath(Path(MainConfig.ExCheckFilePath), Path(f'{object.data.uid}.xml')))
+ path = str(PurePath(Path(config.ExCheckFilePath), Path(f'{object.data.uid}.xml')))
with open(path, 'w+') as file:
file.write(XMI)
file.close()
@@ -428,12 +432,12 @@ def startList(subscription):
if not sanitize_path_input(subscription):
return "invalid subscription sent", 500
- with open(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(f'{uid}.xml'))), 'w+') as file:
- file.write(str(open(str(PurePath(Path(MainConfig.ExCheckFilePath), Path(f'{subscription}.xml'))), 'r').read()))
+ with open(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(f'{uid}.xml'))), 'w+') as file:
+ file.write(str(open(str(PurePath(Path(config.ExCheckFilePath), Path(f'{subscription}.xml'))), 'r').read()))
file.close()
xml = etree.parse(
- MainConfig.ExCheckChecklistFilePath + '/' + uid + '.xml').getroot()
+ config.ExCheckChecklistFilePath + '/' + uid + '.xml').getroot()
starttime = Element('startTime')
starttime.text = startTime
@@ -454,7 +458,7 @@ def startList(subscription):
taskuid.text = str(uuid.uuid4())
with open(
- str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(uid + '.xml'))),
+ str(PurePath(Path(config.ExCheckChecklistFilePath), Path(uid + '.xml'))),
'w+') as file:
y = etree.tostring(xml)
file.write(etree.tostring(xml).decode())
@@ -467,7 +471,7 @@ def startList(subscription):
callsign=request.args.get('callsign'), name=request.args.get('name'), uid=uid,
filename=f'{uid}.xml', template=excheckobj)
- return str(open(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(uid + '.xml'))), 'r').read()), 200
+ return str(open(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(uid + '.xml'))), 'r').read()), 200
@app.route('/Marti/api/excheck/checklist/', methods=["POST"])
def update_checklist():
@@ -485,17 +489,17 @@ def update_checklist():
if not sanitize_path_input(uid):
return "uid", 500
- with open(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(f'{uid}.xml'))), 'wb+') as file:
+ with open(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(f'{uid}.xml'))), 'wb+') as file:
file.write(request.data)
file.close()
- return str(open(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(uid + '.xml'))), 'r').read()), 200
+ return str(open(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(uid + '.xml'))), 'r').read()), 200
@app.route('/Marti/api/excheck/checklist/')
def accesschecklist(checklistid):
if not sanitize_path_input(checklistid):
return "invalid checklistid sent", 500
- return str(open(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(checklistid + '.xml'))),
+ return str(open(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(checklistid + '.xml'))),
'r').read())
@@ -514,7 +518,7 @@ def updatetemplate(checklistid, taskid):
if not sanitize_path_input(checklistid):
return "invalid checklistid sent", 500
xml = etree.parse(
- str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(checklistid + '.xml')))).getroot()
+ str(PurePath(Path(config.ExCheckChecklistFilePath), Path(checklistid + '.xml')))).getroot()
updatedTask = etree.fromstring(data)
tasks = xml.find('checklistTasks')
index = 0
@@ -527,7 +531,7 @@ def updatetemplate(checklistid, taskid):
else:
pass
with open(
- str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(checklistid + '.xml'))), 'w+') as file:
+ str(PurePath(Path(config.ExCheckChecklistFilePath), Path(checklistid + '.xml'))), 'w+') as file:
file.write(etree.tostring(xml).decode())
file.close()
@@ -544,7 +548,7 @@ def updatetemplate(checklistid, taskid):
object.detail.mission.MissionChanges.MissionChange.type.setINTAG("ADD_CONTENT")
object.detail.mission.MissionChanges.MissionChange.contentResource.filename.setINTAG(taskid + '.xml')
object.detail.mission.MissionChanges.MissionChange.contentResource.hash.setINTAG(str(hashlib.sha256(
- str(open(MainConfig.ExCheckChecklistFilePath + '/' + checklistid + '.xml', 'r')).encode()).hexdigest()))
+ str(open(config.ExCheckChecklistFilePath + '/' + checklistid + '.xml', 'r')).encode()).hexdigest()))
object.detail.mission.MissionChanges.MissionChange.contentResource.keywords.setINTAG('Task')
object.detail.mission.MissionChanges.MissionChange.contentResource.name.setINTAG(taskid)
object.detail.mission.MissionChanges.MissionChange.contentResource.size.setINTAG(str(len(data)))
@@ -597,10 +601,10 @@ def activechecklists():
checklists = Checklists.Checklist()
rootxml = Element('checklists')
- for file in listdir(MainConfig.ExCheckChecklistFilePath):
+ for file in listdir(config.ExCheckChecklistFilePath):
try:
checklist = Element('checklist')
- xmldetails = etree.parse(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(file)))).getroot().find(
+ xmldetails = etree.parse(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(file)))).getroot().find(
'checklistDetails')
checklist.append(xmldetails)
checklist.append(Element('checklistColumns'))
diff --git a/FreeTAKServer/controllers/services/FTS.py b/FreeTAKServer/controllers/services/FTS.py
index 4ed30cb2..8e18bf83 100644
--- a/FreeTAKServer/controllers/services/FTS.py
+++ b/FreeTAKServer/controllers/services/FTS.py
@@ -41,6 +41,9 @@
from FreeTAKServer.model.Enumerations.connectionTypes import ConnectionTypes
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loggingConstants = LoggingConstants(log_name="FTS_FTSCore")
logger = CreateLoggerController("FTS_FTSCore", logging_constants=loggingConstants).getLogger()
@@ -533,7 +536,7 @@ def receive_data_froCoT_service_thread(self, recv_pipe, clientArray, send_pipe):
try:
# TODO: change 'add' 'remove' 'update' and 'get' to an enumeration
try:
- data = recv_pipe.get(timeout=MainConfig.MainLoopDelay/1000)
+ data = recv_pipe.get(timeout=config.MainLoopDelay/1000)
except queue.Empty:
return self.user_dict
if data:
@@ -675,7 +678,7 @@ def checkPipes(self):
try:
for service_name, pipe in self.FilterGroup.get_sources().items():
try:
- data = AddDataToCoTList().recv(pipe, timeout=MainConfig.MainLoopDelay / 1000)
+ data = AddDataToCoTList().recv(pipe, timeout=config.MainLoopDelay / 1000)
except Exception as e:
logger.error('get pipe data failed ' + str(e))
continue
@@ -880,7 +883,7 @@ def empty(self, timeout=None):
default='True')
parser.add_argument('-UI', type=str, help="set to true if you would like to start UI on server startup")
args = parser.parse_args()
- if MainConfig.first_start:
+ if config.first_start:
ask_user_for_config()
else:
pass
diff --git a/FreeTAKServer/controllers/services/RestAPI.py b/FreeTAKServer/controllers/services/RestAPI.py
index 2f4ae398..d7dff159 100644
--- a/FreeTAKServer/controllers/services/RestAPI.py
+++ b/FreeTAKServer/controllers/services/RestAPI.py
@@ -75,7 +75,7 @@
socketio.init_app(app, cors_allowed_origins="*")
APIPipe = None
CommandPipe = None
-app.config["SECRET_KEY"] = MainConfig.SecretKey
+app.config["SECRET_KEY"] = config.SecretKey
eventDict = {}
@@ -120,12 +120,12 @@ def sessions():
@socketio.on('connect')
def connection():
- emit('connectUpdate', json.dumps({"starttime": str(StartTime), "version": str(MainConfig.version)}))
+ emit('connectUpdate', json.dumps({"starttime": str(StartTime), "version": str(config.version)}))
@socketio.on('authenticate')
def authenticate(token):
- if json.loads(token)["Authenticate"] == MainConfig.websocketkey:
+ if json.loads(token)["Authenticate"] == config.websocketkey:
emit('authentication', json.dumps({'successful': 'True'}))
session.authenticated = True # pylint: disable=assigning-non-slot; this is necessary to save a client's state
else:
@@ -314,7 +314,7 @@ def updateSystemUserRest():
def updateSystemUser(jsondata):
""" this function updates an existing system user entry in the database. User id must be provided if user with specified id doesn't
exist operation will return an error
-
+
Args:
jsondata: dict
@@ -384,14 +384,14 @@ def addSystemUser(jsondata):
from defusedxml import ElementTree as etree
import shutil
import os
- dp_directory = str(PurePath(Path(MainConfig.DataPackageFilePath)))
- openfile = open(str(PurePath(Path(str(MainConfig.clientPackages), cert_name + '.zip'))),
+ dp_directory = str(PurePath(Path(config.DataPackageFilePath)))
+ openfile = open(str(PurePath(Path(str(config.clientPackages), cert_name + '.zip'))),
mode='rb')
file_hash = str(hashlib.sha256(openfile.read()).hexdigest())
openfile.close()
newDirectory = str(PurePath(Path(dp_directory), Path(file_hash)))
os.mkdir(newDirectory)
- shutil.copy(str(PurePath(Path(str(MainConfig.clientPackages), cert_name + '.zip'))),
+ shutil.copy(str(PurePath(Path(str(config.clientPackages), cert_name + '.zip'))),
str(PurePath(Path(newDirectory), Path(cert_name + '.zip'))))
fileSize = Path(str(newDirectory), cert_name + '.zip').stat().st_size
dbController.create_datapackage(uid=user_id, Name=cert_name + '.zip', Hash=file_hash,
@@ -421,7 +421,7 @@ def addSystemUser(jsondata):
cot.xmlString = clientXML.encode()
newCoT = SendOtherController(cot, addToDB=False)
APIPipe.put(newCoT.getObject())
-
+
else:
# in the event no certificate is to be generated simply create a system user
dbController.create_systemUser(name=systemuser["Name"], group=systemuser["Group"],
@@ -432,9 +432,9 @@ def addSystemUser(jsondata):
errors.append(f"operation failed for user {systemuser['Name']} with error {str(e)}")
else:
errors.append(f"operation failed for user missing name parameter with error {str(e)}")
-
+
if len(errors) == 0:
- return "all users created", 201
+ return "all users created", 201
elif len(errors) 0,
((((Point.lon * -1) - lon_abs) * 111302.62) + (((Point.lat * -1) - lat_abs) * 110574.61)) <= radius + 10),
-
+
and_(
Point.lon < 0,
Point.lat >= 0,
@@ -828,7 +828,7 @@ def getGeoObject():
((((lon_abs * -1) - Point.lon) * 111302.62) + ((lat_abs - Point.lat) * 110574.61)) <= radius + 10,
((((lon_abs * -1) - Point.lon) * 111302.62) + ((lat_abs - Point.lat) * 110574.61)) > 0)
),
-
+
),
and_(
Point.lon >= 0,
@@ -1214,7 +1214,7 @@ def ConnectionMessage():
@app.route("/APIUser", methods=[restMethods.GET, restMethods.POST, restMethods.DELETE])
def APIUser():
- if request.remote_addr in MainConfig.AllowedCLIIPs:
+ if request.remote_addr in config.AllowedCLIIPs:
try:
if request.method == restMethods.POST:
json = request.get_json()
@@ -1259,7 +1259,7 @@ def URLGET():
@app.route("/Clients", methods=[restMethods.GET])
def Clients():
try:
- if request.remote_addr in MainConfig.AllowedCLIIPs:
+ if request.remote_addr in config.AllowedCLIIPs:
CommandPipe.put([functionNames.Clients])
out = CommandPipe.get()
returnValue = []
@@ -1346,6 +1346,9 @@ def FederationTable():
@app.route('/ManageKML/postKML', methods=[restMethods.POST])
@auth.login_required()
def create_kml():
+ # Make a connection to the MainConfig object
+ config = MainConfig.instance()
+
try:
from pykml.factory import KML_ElementMaker as KML
from pykml import parser
@@ -1355,7 +1358,7 @@ def create_kml():
from zipfile import ZipFile
from lxml.etree import SubElement, Element # pylint: disable=no-name-in-module
from geopy import Nominatim
- dp_directory = str(PurePath(Path(MainConfig.DataPackageFilePath)))
+ dp_directory = str(PurePath(Path(config.DataPackageFilePath)))
jsondata = request.get_json(force=True)
name = jsondata["name"]
main = parser.fromstring('')
@@ -1459,6 +1462,10 @@ def broadcast_datapackage(uid):
@auth.login_required()
def DataPackageTable():
from pathlib import Path
+
+ # Make a connection to the MainConfig object
+ config = MainConfig.instance()
+
if request.method == "GET":
output = dbController.query_datapackage()
for i in range(0, len(output)):
@@ -1478,7 +1485,7 @@ def DataPackageTable():
obj = dbController.query_datapackage(f'Hash = "{Hash}"')
print(obj)
# TODO: make this coherent with constants
- currentPath = MainConfig.DataPackageFilePath
+ currentPath = config.DataPackageFilePath
shutil.rmtree(f'{str(currentPath)}/{obj[0].Hash}')
dbController.remove_datapackage(f'Hash = "{Hash}"')
return '200', 200
@@ -1493,7 +1500,7 @@ def DataPackageTable():
from defusedxml import ElementTree as etree
import uuid
from lxml.etree import SubElement, Element # pylint: disable=no-name-in-module
- dp_directory = str(PurePath(Path(MainConfig.DataPackageFilePath)))
+ dp_directory = str(PurePath(Path(config.DataPackageFilePath)))
letters = string.ascii_letters
# uid = ''.join(random.choice(letters) for i in range(4))
# uid = 'uid-' + str(uid)
@@ -1633,11 +1640,11 @@ def excheck_table():
ExCheckArray = json.loads(jsondata)["ExCheck"]
for item in ExCheckArray["Templates"]:
templateitem = DatabaseController().query_ExCheck(f'ExCheckData.uid = "{item["uid"]}"', verbose=True)[0]
- os.remove(str(PurePath(Path(MainConfig.ExCheckFilePath), Path(templateitem.data.filename))))
+ os.remove(str(PurePath(Path(config.ExCheckFilePath), Path(templateitem.data.filename))))
DatabaseController().remove_ExCheck(f'PrimaryKey = "{templateitem.PrimaryKey}"')
for item in ExCheckArray["Checklists"]:
checklistitem = DatabaseController().query_ExCheckChecklist(f'uid = "{item["uid"]}"')[0]
- os.remove(str(PurePath(Path(MainConfig.ExCheckChecklistFilePath), Path(checklistitem.filename))))
+ os.remove(str(PurePath(Path(config.ExCheckChecklistFilePath), Path(checklistitem.filename))))
DatabaseController().remove_ExCheckChecklist(f'uid = "{item["uid"]}"')
return 'success', 200
elif request.method == "POST":
@@ -1659,7 +1666,7 @@ def excheck_table():
object.timestamp = datetime.strptime(object.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
serializer.create_DB_object(object)
xml = etree.fromstring(XMI)
- path = str(PurePath(Path(MainConfig.ExCheckFilePath), Path(f'{object.data.uid}.xml')))
+ path = str(PurePath(Path(config.ExCheckFilePath), Path(f'{object.data.uid}.xml')))
with open(path, 'w+') as file:
file.write(XMI)
file.close()
@@ -1695,7 +1702,7 @@ def excheck_table():
def check_status():
try:
- if request.remote_addr in MainConfig.AllowedCLIIPs:
+ if request.remote_addr in config.AllowedCLIIPs:
CommandPipe.put([functionNames.checkStatus])
FTSServerStatusObject = CommandPipe.get()
out = ApplyFullJsonController().serialize_model_to_json(FTSServerStatusObject)
@@ -1710,7 +1717,7 @@ def check_status():
def help():
try:
from flask import url_for
- message = {"APIVersion": str(MainConfig.APIVersion),
+ message = {"APIVersion": str(config.APIVersion),
"SupportedEndpoints": [url_for(i.endpoint, **(i.defaults or {})) for i in app.url_map.iter_rules() if
i.endpoint != 'static']
}
diff --git a/FreeTAKServer/controllers/services/SSLDataPackageService.py b/FreeTAKServer/controllers/services/SSLDataPackageService.py
index 750704e2..c7be459a 100644
--- a/FreeTAKServer/controllers/services/SSLDataPackageService.py
+++ b/FreeTAKServer/controllers/services/SSLDataPackageService.py
@@ -6,6 +6,9 @@
from FreeTAKServer.controllers.CreateLoggerController import CreateLoggerController
from FreeTAKServer.controllers.SSLSocketController import SSLSocketController
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loggingConstants = LoggingConstants(log_name="FTS-SSL_DataPackage_Service")
logger = CreateLoggerController("FTS-SSL_DataPackage_Service", logging_constants=loggingConstants).getLogger()
@@ -32,17 +35,17 @@ def startup(self, ip, port, pipe):
super().setIP(IP)
super().setHTTPPORT(HTTPPORT)
super().setPIPE(PIPE)
- #wsgi.server(eventlet.listen(('', 14533)), app) keyfile=MainConfig.keyDir,
+ #wsgi.server(eventlet.listen(('', 14533)), app) keyfile=config.keyDir,
self.SSLSocketController = SSLSocketController()
self.SSLSocketController.changeIP(IP)
self.SSLSocketController.changePort(HTTPPORT)
self.setSSL(True)
- wsgi.server(sock=wrap_ssl(listen((DataPackageServerConstants().IP, int(HTTPPORT))), keyfile=MainConfig.unencryptedKey,
- certfile=MainConfig.pemDir,
- server_side=True, ca_certs=MainConfig.CA, cert_reqs=ssl.CERT_REQUIRED), site=app)
+ wsgi.server(sock=wrap_ssl(listen((DataPackageServerConstants().IP, int(HTTPPORT))), keyfile=config.unencryptedKey,
+ certfile=config.pemDir,
+ server_side=True, ca_certs=config.CA, cert_reqs=ssl.CERT_REQUIRED), site=app)
except Exception as e:
logger.error('there has been an exception in Data Package service startup ' + str(e))
return -1
-
+
if __name__ == "__main__":
SSLDataPackageService().startup('0.0.0.0', 8443)
\ No newline at end of file
diff --git a/FreeTAKServer/controllers/services/federation/FederationClientService.py b/FreeTAKServer/controllers/services/federation/FederationClientService.py
index 0cc6ccd2..3ae7ca1c 100644
--- a/FreeTAKServer/controllers/services/federation/FederationClientService.py
+++ b/FreeTAKServer/controllers/services/federation/FederationClientService.py
@@ -1,11 +1,11 @@
#######################################################
-#
+#
# FederationClientService.py
# Python implementation of the Class FederationClientService
# Generated by Enterprise Architect
# Created on: 29-Dec-2020 8:10:38 AM
# Original author: natha
-#
+#
#######################################################
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
from FreeTAKServer.controllers.configuration.types import Types
@@ -43,6 +43,10 @@
from FreeTAKServer.controllers.configuration.LoggingConstants import LoggingConstants
from FreeTAKServer.controllers.CreateLoggerController import CreateLoggerController
+
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loggingConstants = LoggingConstants(log_name="FTS_FederationClientService")
logger = CreateLoggerController("FTS_FederationClientService", logging_constants=loggingConstants).getLogger()
@@ -89,8 +93,8 @@ def define_responsibility_chain(self):
def _create_context(self):
self.context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
- self.context.load_cert_chain(MainConfig.federationCert, MainConfig.federationKey,
- password=MainConfig.federationKeyPassword)
+ self.context.load_cert_chain(config.federationCert, config.federationKey,
+ password=config.federationKeyPassword)
self.context.set_ciphers('DEFAULT@SECLEVEL=1')
def _define_external_data_responsibility_chain(self):
@@ -249,7 +253,7 @@ def outbound_data_handler(self):
else:
pass
else:
- time.sleep(MainConfig.MainLoopDelay / 1000)
+ time.sleep(config.MainLoopDelay / 1000)
def send_command_to_core(self, serialized_data):
if self.pipe.sender_queue.full():
diff --git a/FreeTAKServer/controllers/services/federation/federation.py b/FreeTAKServer/controllers/services/federation/federation.py
index 12d3b767..2dda7185 100644
--- a/FreeTAKServer/controllers/services/federation/federation.py
+++ b/FreeTAKServer/controllers/services/federation/federation.py
@@ -32,8 +32,10 @@
logger = CreateLoggerController("FTS_FederationServerService", logging_constants=loggingConstants).getLogger()
from defusedxml import ElementTree as etree
-loggingConstants = LoggingConstants()
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+loggingConstants = LoggingConstants()
from FreeTAKServer.controllers.services.federation.federation_service_base import FederationServiceBase
@@ -75,8 +77,8 @@ def define_responsibility_chain(self):
def _create_context(self) ->None:
self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- self.context.load_cert_chain(MainConfig.federationCert, MainConfig.federationKey,
- password=MainConfig.federationKeyPassword)
+ self.context.load_cert_chain(config.federationCert, config.federationKey,
+ password=config.federationKeyPassword)
def _create_listener(self, ip: str, port: int)->None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
@@ -293,7 +295,7 @@ def _receive_new_data(self, key):
except Exception as e:
self.logger.warning("exception in receiving data from federate "+str(e))
self.disconnect_client(key.data.uid)
-
+
def _accept_connection(self, sock) -> None:
try:
import uuid
diff --git a/FreeTAKServer/model/ServiceObjects/CoTServiceVariables.py b/FreeTAKServer/model/ServiceObjects/CoTServiceVariables.py
index bc585583..ed05bdb0 100644
--- a/FreeTAKServer/model/ServiceObjects/CoTServiceVariables.py
+++ b/FreeTAKServer/model/ServiceObjects/CoTServiceVariables.py
@@ -1,7 +1,10 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class CoTServiceVariables:
def __init__(self):
self.CoTServiceIP = "0.0.0.0"
- self.CoTServicePort = MainConfig.CoTServicePort
+ self.CoTServicePort = config.CoTServicePort
self.CoTServiceStatus = ""
\ No newline at end of file
diff --git a/FreeTAKServer/model/ServiceObjects/FederationServerServiceVariables.py b/FreeTAKServer/model/ServiceObjects/FederationServerServiceVariables.py
index 74752e93..d38ac7d7 100644
--- a/FreeTAKServer/model/ServiceObjects/FederationServerServiceVariables.py
+++ b/FreeTAKServer/model/ServiceObjects/FederationServerServiceVariables.py
@@ -1,8 +1,11 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class FederationServerServiceVariables:
def __init__(self):
# TODO: change to default ''
self.FederationServerServiceStatus = 'stop'
- self.FederationServerServicePort = MainConfig.FederationPort
+ self.FederationServerServicePort = config.FederationPort
self.FederationServerServiceIP = "0.0.0.0"
\ No newline at end of file
diff --git a/FreeTAKServer/model/ServiceObjects/RestAPIServiceVariables.py b/FreeTAKServer/model/ServiceObjects/RestAPIServiceVariables.py
index 42e77742..44f87b97 100644
--- a/FreeTAKServer/model/ServiceObjects/RestAPIServiceVariables.py
+++ b/FreeTAKServer/model/ServiceObjects/RestAPIServiceVariables.py
@@ -1,7 +1,10 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class RestAPIServiceVariables:
def __init__(self):
- self.RestAPIServiceIP = MainConfig.APIIP
- self.RestAPIServicePort = MainConfig.APIPort
+ self.RestAPIServiceIP = config.APIIP
+ self.RestAPIServicePort = config.APIPort
self.RestAPIServiceStatus = ""
\ No newline at end of file
diff --git a/FreeTAKServer/model/ServiceObjects/SSLCoTServiceVariables.py b/FreeTAKServer/model/ServiceObjects/SSLCoTServiceVariables.py
index ab144440..b074ba06 100644
--- a/FreeTAKServer/model/ServiceObjects/SSLCoTServiceVariables.py
+++ b/FreeTAKServer/model/ServiceObjects/SSLCoTServiceVariables.py
@@ -1,7 +1,10 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class SSLCoTServiceVariables:
def __init__(self):
self.SSLCoTServiceIP = "0.0.0.0"
- self.SSLCoTServicePort = MainConfig.SSLCoTServicePort
+ self.SSLCoTServicePort = config.SSLCoTServicePort
self.SSLCoTServiceStatus = ""
\ No newline at end of file
diff --git a/FreeTAKServer/model/ServiceObjects/SSLDataPackageVariables.py b/FreeTAKServer/model/ServiceObjects/SSLDataPackageVariables.py
index 12293d43..bc8f7980 100644
--- a/FreeTAKServer/model/ServiceObjects/SSLDataPackageVariables.py
+++ b/FreeTAKServer/model/ServiceObjects/SSLDataPackageVariables.py
@@ -1,7 +1,10 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class SSLDataPackageVariables:
def __init__(self):
- self.SSLDataPackageIP = MainConfig.DataPackageServiceDefaultIP
+ self.SSLDataPackageIP = config.DataPackageServiceDefaultIP
self.SSLDataPackagePort = 8443
self.SSLDataPackageStatus = ""
\ No newline at end of file
diff --git a/FreeTAKServer/model/ServiceObjects/TCPDataPackageServiceVariables.py b/FreeTAKServer/model/ServiceObjects/TCPDataPackageServiceVariables.py
index 319e8753..0324b477 100644
--- a/FreeTAKServer/model/ServiceObjects/TCPDataPackageServiceVariables.py
+++ b/FreeTAKServer/model/ServiceObjects/TCPDataPackageServiceVariables.py
@@ -1,7 +1,10 @@
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class TCPDataPackageServiceVariables:
def __init__(self):
- self.TCPDataPackageServiceIP = MainConfig.DataPackageServiceDefaultIP
+ self.TCPDataPackageServiceIP = config.DataPackageServiceDefaultIP
self.TCPDataPackageServicePort = 8080
self.TCPDataPackageServiceStatus = ""
\ No newline at end of file
diff --git a/FreeTAKServer/model/sockets/SSLServerSocket.py b/FreeTAKServer/model/sockets/SSLServerSocket.py
index aec673c0..e20c4dc3 100644
--- a/FreeTAKServer/model/sockets/SSLServerSocket.py
+++ b/FreeTAKServer/model/sockets/SSLServerSocket.py
@@ -1,12 +1,15 @@
from FreeTAKServer.model.sockets.MainSocket import MainSocket
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
class SSLServerSocket(MainSocket):
def __init__(self):
super().__init__()
- self.keyDir = MainConfig.keyDir
- self.pemDir = MainConfig.pemDir
- self.testKeyDir = MainConfig.testKey
- self.testPemDir = MainConfig.testPem
- self.password = MainConfig.password
- self.CA = MainConfig.CA
\ No newline at end of file
+ self.keyDir = config.keyDir
+ self.pemDir = config.pemDir
+ self.testKeyDir = config.testKey
+ self.testPemDir = config.testPem
+ self.password = config.password
+ self.CA = config.CA
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 952db6a9..a6a91648 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,4 +2,8 @@
#
# installs dependencies from ./setup.py, and the package itself,
# in editable mode for development
- -e .
\ No newline at end of file
+ -e .
+
+ pytest>=7.0.0
+ pytest-pep8>=1.0.0
+ pytest-cov>=4.0.0
diff --git a/test users/api_service_test.py b/test users/api_service_test.py
index d90bbe4d..d04901cc 100644
--- a/test users/api_service_test.py
+++ b/test users/api_service_test.py
@@ -11,6 +11,9 @@
from stdlib_extensions import CustomAssertions
from common_testing_tools import TCPClient
+# Make a connection to the MainConfig object for all routines below
+config = MainConfig.instance()
+
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pool = concurrent.futures.ThreadPoolExecutor()
@@ -22,7 +25,7 @@ def get_user_uid(username, password, token):
def create_socketio_client(sio: socketio.Client, url: str):
sio.connect(url=url)
- sio.emit("authenticate", data=json.dumps({"Authenticate": MainConfig.websocketkey}))
+ sio.emit("authenticate", data=json.dumps({"Authenticate": config.websocketkey}))
return sio
@@ -36,7 +39,7 @@ def get_endpoint(auth_token: str, body: dict = {}, url: str = 'http://127.0.0.1:
params: the url parameters to be submitted with request
endpoint: the endpoint to which the request is sent
- Returns:
+ Returns:
object: response object
"""
@@ -73,25 +76,25 @@ def test_generate_system_user(self):
def test_get_system_user(self):
sio = create_socketio_client(sio=self.sio, url=self.url)
-
+
system_users = {}
-
+
@sio.event
def systemUsersUpdate(data):
nonlocal system_users
system_users = data
-
+
sio.emit("systemUsers")
-
+
start = time.time()
while system_users == {} and time.time() < start+5:
time.sleep(0.1)
for user in system_users["SystemUsers"]:
if user["Name"] == "testUser":
self.assertEqual(user["Name"], "testUser")
-
+
self.assertKeyValuePairInDict(key = "Name", var = "testUser", dictionary = system_users)
-
+
def test_system_user_credentials(self, password="testPassword", token="testToken", username="testUser"):
fts_response = get_endpoint(auth_token=token, params={"password": password, "username": username},
endpoint="/AuthenticateUser")
@@ -122,7 +125,7 @@ def setUp(self):
def test_create_address_based_kml(self):
""" this method tests the functionality of creating address based KML packages
-
+
Returns: None
"""
diff --git a/tests/test_controllers/MainConfig_test.py b/tests/test_controllers/MainConfig_test.py
new file mode 100644
index 00000000..a34aed8e
--- /dev/null
+++ b/tests/test_controllers/MainConfig_test.py
@@ -0,0 +1,153 @@
+import unittest
+import pytest
+import os
+import yaml
+
+from FreeTAKServer.controllers.configuration.MainConfig import MainConfig, USERPATH, PYTHON_VERSION
+from pathlib import PosixPath, WindowsPath
+from unittest import mock
+
+class Test_MainConfig(unittest.TestCase):
+
+ # Need to re-initialize the singleton after each test
+ def teardown_method(self, func):
+ MainConfig.reset()
+
+ def test_default_values(self):
+ config = MainConfig.instance()
+
+ for conf_var, metadata in config._defaults.items():
+ if type(metadata['default']) in (PosixPath, WindowsPath):
+ assert(config.get(conf_var) == str(metadata['default']))
+ else:
+ assert(config.get(conf_var) == metadata['default'])
+
+ # we test only a couple of vars with env override assuming rest are OK
+ @mock.patch.dict(os.environ, {'FTS_DATA_RECEPTION_BUFFER': '512'}) # int test
+ def test_env_var_int_values(self):
+ config = MainConfig.instance()
+ assert(config.DataReceptionBuffer == 512)
+
+ @mock.patch.dict(os.environ, {'FTS_OPTIMIZE_API': '0'}) # bool test
+ def test_env_var_bool_values(self):
+ config = MainConfig.instance()
+ assert(config.OptimizeAPI == False)
+
+ @mock.patch.dict(os.environ, {'FTS_SECRET_KEY': 'abc123'}) # str test
+ def test_env_var_string_values(self):
+ config = MainConfig.instance()
+ assert(config.SecretKey == 'abc123')
+
+ @mock.patch.dict(os.environ, {'FTS_CLI_WHITELIST': '127.0.0.1:10.10.10.10'})
+ def test_env_var_list_values_with_colon(self):
+ config = MainConfig.instance()
+ assert all([a == b for a, b in zip(config.AllowCLIIPs, ['127.0.0.1', '10.10.10.10'])])
+
+ @mock.patch.dict(os.environ, {'FTS_CLI_WHITELIST': '127.0.0.1,10.0.0.10'})
+ def test_env_var_list_values_with_comma(self):
+ config = MainConfig.instance()
+ assert all([a == b for a, b in zip(config.AllowCLIIPs, ['127.0.0.1', '10.0.0.10'])])
+
+
+ yaml_config = """
+System:
+ FTS_MAINLOOP_DELAY: 600
+ FTS_CONNECTION_MESSAGE: ''
+ FTS_DATABASE_TYPE: 'CouchDB'
+ FTS_OPTIMIZE_API: false
+ FTS_SECRET_KEY: 'SecretSquirel'
+ FTS_DATA_RECEPTION_BUFFER: 256
+ FTS_MAX_RECEPTION_TIME: 20
+Addresses:
+ FTS_COT_PORT: 2000
+ FTS_SSLCOT_PORT: 2001
+ FTS_DP_ADDRESS: '10.0.0.10'
+ FTS_USER_ADDRESS: 'parrot.example.com'
+ FTS_API_PORT: 2003
+ FTS_FED_PORT: 2004
+ FTS_API_ADDRESS: '192.168.10.20'
+Filesystem:
+ FTS_COT_TO_DB: false
+ FTS_DB_PATH: '/db/store'
+ FTS_MAINPATH: '/fts'
+ FTS_CERTS_PATH: '/fts/certs'
+ FTS_EXCHECK_PATH: '/fts/excheck'
+ FTS_EXCHECK_TEMPLATE_PATH: '/fts/template'
+ FTS_EXCHECK_CHECKLIST_PATH: '/fts/checklist'
+ FTS_DATAPACKAGE_PATH: '/fts/package/data'
+ FTS_LOGFILE_PATH: '/fts/logs'
+ FTS_CLIENT_PACKAGES: '/fts/package/client'
+Certs:
+ FTS_SERVER_KEYDIR: '/fts/certs/private'
+ FTS_SERVER_PEMDIR: '/fts/certs/public'
+ FTS_TESTCLIENT_PEMDIR: '/fts/test/public'
+ FTS_TESTCLIENT_KEYDIR: '/fts/test/private'
+ FTS_UNENCRYPTED_KEYDIR: '/fts/certs/txt'
+ FTS_SERVER_P12DIR: '/fts/certs/p12'
+ FTS_CADIR: '/fts/certs/ca'
+ FTS_CAKEYDIR: '/fts/certs/private'
+ FTS_FEDERATION_CERTDIR: '/fts/certs/public'
+ FTS_FEDERATION_KEYDIR: '/fts/certs/private'
+ FTS_FEDERATION_KEYPASS: 'ringo-ranger'
+ FTS_FED_PASSWORD: 'SuperMouse'
+ FTS_CLIENT_CERT_PASSWORD: 'BooBoo'
+ FTS_WEBSOCKET_KEY: 'flat_bread'
+ FTS_CRLDIR: '/fts/certs/crl'
+"""
+
+ @mock.patch('builtins.open',
+ create=True,
+ new=mock.mock_open(read_data=yaml_config))
+ def test_yaml_config(self):
+ config = MainConfig.instance(config_file='/dev/null')
+
+ expected = yaml.load(Test_MainConfig.yaml_config, Loader=yaml.SafeLoader)
+ # Process each attribute of each YAML section
+ for sect in MainConfig._yaml_keys:
+ for var in MainConfig._yaml_keys[sect]:
+ if var in expected[sect]:
+ assert(config.get(MainConfig._yaml_keys[sect][var]) == expected[sect][var])
+
+
+ @mock.patch('builtins.open',
+ create=True,
+ new=mock.mock_open(read_data=yaml_config))
+ @mock.patch.dict(os.environ, {'FTS_SSLCOT_PORT': '10000'}) # int test
+ @mock.patch.dict(os.environ, {'FTS_COT_TO_DB': '1'}) # bool test
+ @mock.patch.dict(os.environ, {'FTS_MAINPATH': '/tmp/fts'}) # str test
+ def test_yaml_config_with_env_override(self):
+ config = MainConfig.instance(config_file='/dev/null')
+
+ assert(config.SSLCoTServicePort== 10000)
+ assert(config.SaveCoTToDB == True)
+ assert(config.MainPath == '/tmp/fts')
+
+ def test_get_config_as_attribute(self):
+ config = MainConfig.instance()
+ assert(config.MainPath == fr'{USERPATH}{PYTHON_VERSION}/dist-packages/FreeTAKServer')
+ assert(config.SaveCoTToDB == True)
+ assert(config.FederationPort == 9000)
+
+ def test_set_config_as_attribute(self):
+ config = MainConfig.instance()
+ config.OptimizeAPI = False
+ assert(config.get('OptimizeAPI') == False)
+ config.APIPort = 8088
+ assert(config.get('APIPort') == 8088)
+ config.LogFilePath = '/tmp/logs'
+ assert(config.get('LogFilePath') == '/tmp/logs')
+
+ def test_get_config_as_dictionary(self):
+ config = MainConfig.instance()
+ assert(config['MainPath'] == fr'{USERPATH}{PYTHON_VERSION}/dist-packages/FreeTAKServer')
+ assert(config['SaveCoTToDB'] == True)
+ assert(config['FederationPort'] == 9000)
+
+ def test_set_config_as_dictionary(self):
+ config = MainConfig.instance()
+ config['OptimizeAPI'] = False
+ assert(config.get('OptimizeAPI') == False)
+ config['APIPort'] = 8088
+ assert(config.get('APIPort') == 8088)
+ config['LogFilePath'] = '/tmp/logs'
+ assert(config.get('LogFilePath') == '/tmp/logs')
\ No newline at end of file