Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can we remove non-permissive dependency from non-test code with regards to CWTs? #3

Open
wants to merge 8 commits into
base: cwt
Choose a base branch
from
284 changes: 268 additions & 16 deletions docs/registration_policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ The SCITT API emulator can deny entry based on presence of
This is a simple way to enable evaluation of claims prior to submission by
arbitrary policy engines which watch the workspace (fanotify, inotify, etc.).

[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/572766.svg)](https://asciinema.org/a/572766)
[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/620587.svg)](https://asciinema.org/a/620587)

Start the server

```console
$ rm -rf workspace/
$ mkdir -p workspace/storage/operations
$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro
$ timeout 0.5s scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro
Service parameters: workspace/service_parameters.json
^C
```
Expand Down Expand Up @@ -84,36 +84,186 @@ import os
import sys
import json
import pathlib
import unittest
import traceback
import contextlib
import urllib.parse

import jwt
import cwt
import cwt.algs.ec2
import cbor2
import pycose

# TODO Remove this once we have a example flow for proper key verification
import jwcrypto.jwk
from jsonschema import validate, ValidationError
from pycose.messages import CoseMessage, Sign1Message
import pycose.keys.ec2
import cryptography.hazmat.primitives.serialization
from pycose.messages import Sign1Message

from scitt_emulator.scitt import ClaimInvalidError, COSE_Headers_Issuer
from scitt_emulator.scitt import ClaimInvalidError, CWTClaims

claim = sys.stdin.buffer.read()

msg = CoseMessage.decode(claim)
msg = Sign1Message.decode(claim, tag=True)

if pycose.headers.ContentType not in msg.phdr:
raise ClaimInvalidError("Claim does not have a content type header parameter")
if COSE_Headers_Issuer not in msg.phdr:
raise ClaimInvalidError("Claim does not have an issuer header parameter")

if not msg.phdr[pycose.headers.ContentType].startswith("application/json"):
raise TypeError(
f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}"
)

# TODO Whatever the opisite of COSESign1 is

# Figure out what the issuer is
cwt_cose_loads = cwt.cose.COSE()._loads
cwt_unverified_protected = cwt_cose_loads(cwt_cose_loads(msg.phdr[CWTClaims]).value[2])
unverified_issuer = cwt_unverified_protected[1]

def did_web_to_url(did_web_string, scheme=os.environ.get("DID_WEB_ASSUME_SCHEME", "https")):
return "/".join(
[
f"{scheme}:/",
*[urllib.parse.unquote(i) for i in did_web_string.split(":")[2:]],
]
)

if unverified_issuer.startswith("did:web:"):
unverified_issuer = did_web_to_url(unverified_issuer)

# TODO Should we use audiance? I think no, just want to make sure we've
# documented why thought if not. No usage makes sense to me becasue we don't
# know the intended audiance, it could be federated into multiple TS

# TODO Can you just pass a whole public key as an issuer?

# Load keys from issuer
jwk_keys = []
cryptography_ssh_keys = []
cwt_cose_keys = []
pycose_cose_keys = []

import urllib.request
import urllib.parse

# TODO did:web: -> URL
from cryptography.hazmat.primitives import serialization

if "://" in unverified_issuer and not unverified_issuer.startswith("file://"):
# TODO Logging for URLErrors
# Check if OIDC issuer
unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer)
openid_configuration_url = unverified_issuer_parsed_url._replace(
path="/.well-known/openid-configuration",
).geturl()
with contextlib.suppress(urllib.request.URLError):
with urllib.request.urlopen(openid_configuration_url) as response:
if response.status == 200:
openid_configuration = json.loads(response.read())
jwks_uri = openid_configuration["jwks_uri"]
with urllib.request.urlopen(jwks_uri) as response:
if response.status == 200:
jwks = json.loads(response.read())
for jwk_key_as_dict in jwks["keys"]:
"""
jwk_key_as_string = json.dumps(jwk_key_as_dict)
jwk_keys.append(
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
)
"""
cwt_cose_key = cwt.COSEKey.from_jwk(
jwk_key_as_dict
)
cwt_cose_keys.append(cwt_cose_key)

# Try loading ssh keys. Example: https://github.com/username.keys
with contextlib.suppress(urllib.request.URLError):
with urllib.request.urlopen(unverified_issuer) as response:
while line := response.readline():
with contextlib.suppress(
(ValueError, cryptography.exceptions.UnsupportedAlgorithm)
):
cryptography_ssh_keys.append(
cryptography.hazmat.primitives.serialization.load_ssh_public_key(
line
)
)

for cryptography_ssh_key in cryptography_ssh_keys:
jwk_keys.append(
jwcrypto.jwk.JWK.from_pem(
cryptography_ssh_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
)
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.kid,
)
# cwt_cose_keys.append(cwt_cose_key)

for cwt_cose_key in cwt_cose_keys:
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
import pprint
import inspect
cose_tags = {
member.identifier: member.fullname
for _member_name, member in inspect.getmembers(pycose.headers)
if (
hasattr(member, "identifier")
and hasattr(member, "fullname")
)
}
pprint.pprint(cose_tags)
cwt_ec2_key_as_dict_labeled = {
cose_tags.get(key, key): value
for key, value in cwt_ec2_key_as_dict.items()
}
print("cwt_ec2_key_as_dict_labeled['STATIC_KEY_ID']", cwt_ec2_key_as_dict_labeled['CRITICAL'])
pprint.pprint(cwt_ec2_key_as_dict)
pprint.pprint(cwt_ec2_key_as_dict_labeled)
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_key.kid = cwt_ec2_key_as_dict_labeled['CRITICAL']
cwt_cose_key._kid = pycose_cose_key.kid
pycose_cose_keys.append(pycose_cose_key)

verify_signature = False
for pycose_cose_key in pycose_cose_keys:
with contextlib.suppress(Exception):
msg.key = pycose_cose_key
verify_signature = msg.verify_signature()
if verify_signature:
# msg.kid = pycose_cose_key.kid
break

unittest.TestCase().assertTrue(
verify_signature,
"Failed to verify signature on statement",
)

pprint.pprint(pycose_cose_keys)
pprint.pprint(cwt_cose_keys)

cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_keys)
issuer = cwt_protected[1]
subject = cwt_protected[2]

# TODO Validate content type is JSON?
SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text())

try:
validate(
instance={
"$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json",
"issuer": msg.phdr[COSE_Headers_Issuer],
"issuer": issuer,
"subject": subject,
"claim": json.loads(msg.payload.decode()),
},
schema=SCHEMA,
Expand All @@ -140,21 +290,106 @@ echo ${CLAIM_PATH}
Example running allowlist check and enforcement.

```console
npm install -g nodemon
nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;'
$ npm install nodemon && \
node_modules/.bin/nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;'
```

Also ensure you restart the server with the new config we edited.

```console
scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro
$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro
```

The current emulator notary (create-statement) implementation will sign
statements using a generated ephemeral key or a key we provide via the
`--private-key-pem` argument.

Since we need to export the key for verification by the policy engine, we will
first generate it using `ssh-keygen`.

```console
$ export ISSUER_PORT="9000" \
&& export ISSUER_URL="http://localhost:${ISSUER_PORT}" \
&& ssh-keygen -q -f /dev/stdout -t ecdsa -b 384 -N '' -I $RANDOM <<<y 2>/dev/null | python -c 'import sys; from cryptography.hazmat.primitives import serialization; print(serialization.load_ssh_private_key(sys.stdin.buffer.read(), password=None).private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode().rstrip())' > private-key.pem \
&& scitt-emulator client create-claim \
--private-key-pem private-key.pem \
--issuer "${ISSUER_URL}" \
--subject "solar" \
--content-type application/json \
--payload '{"sun": "yellow"}' \
--out claim.cose
```

Create claim from allowed issuer (`.org`) and from non-allowed (`.com`).
The core of policy engine we implemented in `jsonschema_validator.py` will
verify the COSE message generated using the public portion of the notary's key.
We've implemented two possible styles of key resolution. Both of them require
resolution of public keys via an HTTP server.

Let's start the HTTP server now, we'll populate the needed files in the
sections corresponding to each resolution style.

```console
$ python -m http.server "${ISSUER_PORT}" &
$ python_http_server_pid=$!
```

### SSH `authorized_keys` style notary public key resolution

Keys are discovered via making an HTTP GET request to the URL given by the
`issuer` parameter via the `web` DID method and de-serializing the SSH
public keys found within the response body.

Start an HTTP server with an SSH public key served at the root.

```console
$ cat private-key.pem | ssh-keygen -f /dev/stdin -y | tee index.html
```

### OpenID Connect token style notary public key resolution

Keys are discovered two part resolution of HTTP paths relative to the issuer

`/.well-known/openid-configuration` path is requested via HTTP GET. The
response body is parsed as JSON and the value of the `jwks_uri` key is
requested via HTTP GET.

`/.well-known/jwks` (is typically the value of `jwks_uri`) path is requested
via HTTP GET. The response body is parsed as JSON. Public keys are loaded
from the value of the `keys` key which stores an array of JSON Web Key (JWK)
style serializations.

```console
$ mkdir -p .well-known/
$ cat > .well-known/openid-configuration <<EOF
{
"issuer": "${ISSUER_URL}",
"jwks_uri": "${ISSUER_URL}/.well-known/jwks",
"response_types_supported": ["id_token"],
"claims_supported": ["sub", "aud", "exp", "iat", "iss"],
"id_token_signing_alg_values_supported": ["ES384"],
"scopes_supported": ["openid"]
}
EOF
$ cat private-key.pem | python -c 'import sys, json, jwcrypto.jwt; key = jwcrypto.jwt.JWK(); key.import_from_pem(sys.stdin.buffer.read()); print(json.dumps({"keys":[{**key.export_public(as_dict=True),"use": "sig","kid": key.thumbprint()}]}, indent=4, sort_keys=True))' | tee .well-known/jwks
{
"keys": [
{
"crv": "P-384",
"kid": "y96luxaBaw6FeWVEMti_iqLWPSYk8cKLzZG8X45PA2k",
"kty": "EC",
"use": "sig",
"x": "ZQazDzYmcMHF5Dstkbw7SwWvR_oXQHFS-TLppri-0xDby8TmCpzHyr6TH03CLBxj",
"y": "lsIbRskEv06Rf0vttkB3vpXdZ-a50ck74MVyRwOvN55P4s8usQAm3PY1KnAgWtHF"
}
]
}
```

Attempt to submit the statement we created. You should see that due to our
current `allowlist.schema.json` the Transparency Service denied the insertion
of the statement into the log.

```console
$ scitt-emulator client create-claim --issuer did:web:example.com --content-type application/json --payload '{"sun": "yellow"}' --out claim.cose
A COSE-signed Claim was written to: claim.cose
$ scitt-emulator client submit-claim --claim claim.cose --out claim.receipt.cbor
Traceback (most recent call last):
File "/home/alice/.local/bin/scitt-emulator", line 33, in <module>
Expand All @@ -174,10 +409,27 @@ Failed validating 'enum' in schema['properties']['issuer']:

On instance['issuer']:
'did:web:example.com'
```

Modify the allowlist to ensure that our issuer, aka our local HTTP server with
our keys, is set to be the allowed issuer.

```console
$ export allowlist="$(cat allowlist.schema.json)" && \
jq '.properties.issuer.enum[0] = env.ISSUER_URL' <(echo "${allowlist}") \
| tee allowlist.schema.json
```

$ scitt-emulator client create-claim --issuer did:web:example.org --content-type application/json --payload '{"sun": "yellow"}' --out claim.cose
A COSE signed Claim was written to: claim.cose
Submit the statement from the issuer we just added to the allowlist.

```console
$ scitt-emulator client submit-claim --claim claim.cose --out claim.receipt.cbor
Claim registered with entry ID 1
Receipt written to claim.receipt.cbor
```

Stop the server that serves the public keys

```console
$ kill $python_http_server_pid
```
2 changes: 2 additions & 0 deletions run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if [ ! -f "venv/bin/activate" ]; then
pip install -q -U pip setuptools wheel
pip install -q -r dev-requirements.txt
pip install -q -e .[oidc]
# TODO Resolve this, temporary fix for https://github.com/scitt-community/scitt-api-emulator/issues/38
pip install urllib3==1.26.15 requests-toolbelt==0.10.1
else
. ./venv/bin/activate
fi
Expand Down
16 changes: 2 additions & 14 deletions scitt_emulator/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import httpx

import scitt_emulator.scitt as scitt
from scitt_emulator import create_statement
from scitt_emulator.tree_algs import TREE_ALGS

DEFAULT_URL = "http://127.0.0.1:8000"
Expand Down Expand Up @@ -72,10 +73,6 @@ def post(self, *args, **kwargs):
return self._request("POST", *args, **kwargs)


def create_claim(issuer: str, content_type: str, payload: str, claim_path: Path):
scitt.create_claim(claim_path, issuer, content_type, payload)


def submit_claim(
url: str,
claim_path: Path,
Expand Down Expand Up @@ -170,16 +167,7 @@ def cli(fn):
parser = fn(description="Execute client commands")
sub = parser.add_subparsers(dest="cmd", help="Command to execute", required=True)

p = sub.add_parser("create-claim", description="Create a fake SCITT claim")
p.add_argument("--out", required=True, type=Path)
p.add_argument("--issuer", required=True, type=str)
p.add_argument("--content-type", required=True, type=str)
p.add_argument("--payload", required=True, type=str)
p.set_defaults(
func=lambda args: scitt.create_claim(
args.out, args.issuer, args.content_type, args.payload
)
)
create_statement.cli(sub.add_parser)

p = sub.add_parser(
"submit-claim", description="Submit a SCITT claim and retrieve the receipt"
Expand Down
Loading
Loading