Skip to content

Commit

Permalink
Move namespaces JSON test to test_stashcache.py so I can use the test…
Browse files Browse the repository at this point in the history
… global data

keep a simple endpoint test in test_api.py
  • Loading branch information
matyasselmeci committed Nov 21, 2023
1 parent 3f101bb commit 61c9fd0
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 89 deletions.
97 changes: 8 additions & 89 deletions src/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

from app import app, global_data
from webapp.topology import Facility, Site, Resource, ResourceGroup
from webapp.data_federation import CredentialGeneration

HOST_PORT_RE = re.compile(r"[a-zA-Z0-9.-]{3,63}:[0-9]{2,5}")
PROTOCOL_HOST_PORT_RE = re.compile(r"[a-z]+://" + HOST_PORT_RE.pattern)

INVALID_USER = dict(
username="invalid",
Expand Down Expand Up @@ -62,7 +58,9 @@
"/cache/scitokens.conf",
"/api/institutions",
"/cache/grid-mapfile",
"/origin/grid-mapfile"
"/origin/grid-mapfile",
"/osdf/namespaces",
"/stashcache/namespaces",
]


Expand All @@ -72,90 +70,6 @@ def client():
yield client


class TestNamespaces:
@pytest.fixture
def namespaces_json(self, client) -> Dict:
response = client.get('/stashcache/namespaces')
assert response.status_code == 200
return response.json

@pytest.fixture
def namespaces(self, namespaces_json) -> List[Dict]:
assert "namespaces" in namespaces_json
return namespaces_json["namespaces"]

@staticmethod
def validate_cache_schema(cc):
assert HOST_PORT_RE.match(cc["auth_endpoint"])
assert HOST_PORT_RE.match(cc["endpoint"])
assert cc["resource"] and isinstance(cc["resource"], str)

@staticmethod
def validate_namespace_schema(ns):
assert isinstance(ns["caches"], list) # we do have a case where it's empty
assert ns["path"].startswith("/") # implies str
assert isinstance(ns["readhttps"], bool)
assert isinstance(ns["usetokenonread"], bool)
assert ns["dirlisthost"] is None or PROTOCOL_HOST_PORT_RE.match(ns["dirlisthost"])
assert ns["writebackhost"] is None or PROTOCOL_HOST_PORT_RE.match(ns["writebackhost"])
credgen = ns["credential_generation"]
if credgen is not None:
assert isinstance(credgen["max_scope_depth"], int) and credgen["max_scope_depth"] > -1
assert credgen["strategy"] in CredentialGeneration.STRATEGIES
assert credgen["issuer"]
parsed_issuer = urllib.parse.urlparse(credgen["issuer"])
assert parsed_issuer.netloc and parsed_issuer.scheme == "https"
if credgen["vault_server"]:
assert isinstance(credgen["vault_server"], str)
if credgen["vault_issuer"]:
assert isinstance(credgen["vault_issuer"], str)
if credgen["base_path"]:
assert isinstance(credgen["base_path"], str)

def test_caches(self, namespaces_json):
assert "caches" in namespaces_json
caches = namespaces_json["caches"]
# Have a reasonable number of caches
assert len(caches) > 20
for cache in caches:
self.validate_cache_schema(cache)

def test_namespaces(self, namespaces):
# Have a reasonable number of namespaces
assert len(namespaces) > 15

found_credgen = False
for namespace in namespaces:
if namespace["credential_generation"] is not None:
found_credgen = True
self.validate_namespace_schema(namespace)
if namespace["caches"]:
for cache in namespace["caches"]:
self.validate_cache_schema(cache)
assert found_credgen, "At least one namespace with credential_generation"

@staticmethod
def validate_scitokens_block(sci):
assert sci["issuer"]
assert isinstance(sci["issuer"], str)
assert "://" in sci["issuer"]
assert isinstance(sci["basepath"], list)
assert sci["basepath"] # must have at least 1
for bp in sci["basepath"]:
assert bp.startswith("/") # implies str
assert "," not in bp
assert isinstance(sci["restrictedpath"], list)
for rp in sci["restrictedpath"]: # may be empty
assert rp.startswith("/") # implies str
assert "," not in rp

def test_issuers_in_namespaces(self, namespaces):
for namespace in namespaces:
assert isinstance(namespace["scitokens"], list)
for scitokens_block in namespace["scitokens"]:
self.validate_scitokens_block(scitokens_block)


class TestAPI:

def test_sanity(self, client: flask.Flask):
Expand Down Expand Up @@ -368,6 +282,11 @@ def test_cache_grid_mapfile(self, client: flask.Flask):
hashes_not_in_authfile = mapfile_hashes - authfile_hashes
assert not hashes_not_in_authfile, f"Hashes in mapfile but not in authfile: {hashes_not_in_authfile}"

def test_namespaces_json(self, client):
response = client.get('/osdf/namespaces')
assert response.status_code == 200
assert "namespaces" in response.json


class TestEndpointContent:
# Pre-build some test cases based on AMNH resources
Expand Down
88 changes: 88 additions & 0 deletions src/tests/test_stashcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import re
from pytest_mock import MockerFixture
import time
from typing import List, Dict
import urllib, urllib.parse

# Rewrites the path so the app can be imported like it normally is
import os
Expand All @@ -18,8 +20,12 @@
from app import app, global_data
from webapp import models, topology, vos_data
from webapp.common import load_yaml_file
from webapp.data_federation import CredentialGeneration
import stashcache

HOST_PORT_RE = re.compile(r"[a-zA-Z0-9.-]{3,63}:[0-9]{2,5}")
PROTOCOL_HOST_PORT_RE = re.compile(r"[a-z]+://" + HOST_PORT_RE.pattern)

GRID_MAPPING_REGEX = re.compile(r'^"(/[^"]*CN=[^"]+")\s+([0-9a-f]{8}[.]0)$')
# ^^ the DN starts with a slash and will at least have a CN in it.
EMPTY_LINE_REGEX = re.compile(r'^\s*(#|$)') # Empty or comment-only lines
Expand Down Expand Up @@ -218,5 +224,87 @@ def test_cache_grid_mapfile_i2_cache(self, client: flask.Flask, mocker: MockerFi
assert num_mappings > 5, f"Too few mappings found.\nFull text:\n{text}\n"


class TestNamespaces:
@pytest.fixture
def namespaces_json(self, test_global_data) -> Dict:
return stashcache.get_namespaces_info(test_global_data)

@pytest.fixture
def namespaces(self, namespaces_json) -> List[Dict]:
assert "namespaces" in namespaces_json
return namespaces_json["namespaces"]

@staticmethod
def validate_cache_schema(cc):
assert HOST_PORT_RE.match(cc["auth_endpoint"])
assert HOST_PORT_RE.match(cc["endpoint"])
assert cc["resource"] and isinstance(cc["resource"], str)

@staticmethod
def validate_namespace_schema(ns):
assert isinstance(ns["caches"], list) # we do have a case where it's empty
assert ns["path"].startswith("/") # implies str
assert isinstance(ns["readhttps"], bool)
assert isinstance(ns["usetokenonread"], bool)
assert ns["dirlisthost"] is None or PROTOCOL_HOST_PORT_RE.match(ns["dirlisthost"])
assert ns["writebackhost"] is None or PROTOCOL_HOST_PORT_RE.match(ns["writebackhost"])
credgen = ns["credential_generation"]
if credgen is not None:
assert isinstance(credgen["max_scope_depth"], int) and credgen["max_scope_depth"] > -1
assert credgen["strategy"] in CredentialGeneration.STRATEGIES
assert credgen["issuer"]
parsed_issuer = urllib.parse.urlparse(credgen["issuer"])
assert parsed_issuer.netloc and parsed_issuer.scheme == "https"
if credgen["vault_server"]:
assert isinstance(credgen["vault_server"], str)
if credgen["vault_issuer"]:
assert isinstance(credgen["vault_issuer"], str)
if credgen["base_path"]:
assert isinstance(credgen["base_path"], str)

def test_caches(self, namespaces_json):
assert "caches" in namespaces_json
caches = namespaces_json["caches"]
# Have a reasonable number of caches
assert len(caches) > 20
for cache in caches:
self.validate_cache_schema(cache)

def test_namespaces(self, namespaces):
# Have a reasonable number of namespaces
assert len(namespaces) > 15

found_credgen = False
for namespace in namespaces:
if namespace["credential_generation"] is not None:
found_credgen = True
self.validate_namespace_schema(namespace)
if namespace["caches"]:
for cache in namespace["caches"]:
self.validate_cache_schema(cache)
assert found_credgen, "At least one namespace with credential_generation"

@staticmethod
def validate_scitokens_block(sci):
assert sci["issuer"]
assert isinstance(sci["issuer"], str)
assert "://" in sci["issuer"]
assert isinstance(sci["basepath"], list)
assert sci["basepath"] # must have at least 1
for bp in sci["basepath"]:
assert bp.startswith("/") # implies str
assert "," not in bp
assert isinstance(sci["restrictedpath"], list)
for rp in sci["restrictedpath"]: # may be empty
assert rp.startswith("/") # implies str
assert "," not in rp

def test_issuers_in_namespaces(self, namespaces):
for namespace in namespaces:
assert isinstance(namespace["scitokens"], list)
for scitokens_block in namespace["scitokens"]:
self.validate_scitokens_block(scitokens_block)


if __name__ == '__main__':
pytest.main()

0 comments on commit 61c9fd0

Please sign in to comment.