Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: basic boot password should be able to contain colon characters #349

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,22 @@ def __init__(self, agency: Agency, username: str | None = None, password: str |
self.password = password
self.agency = agency

def parseBasicAuth(self, req: falcon.Request):
schemePrefix = 'Basic '
if req.auth is None or not req.auth.startswith(schemePrefix):
return None, None

token = b64decode(req.auth[len(schemePrefix):]).decode('utf-8')
splitIndex = token.find(':')
if splitIndex == -1:
return None, None

username = token[:splitIndex]
password = token[splitIndex + 1:]

return username, password


def authenticate(self, req: falcon.Request):
# Username AND Password is not set, so no need to authenticate
if self.username is None and self.password is None:
Expand All @@ -892,15 +908,8 @@ def authenticate(self, req: falcon.Request):
if req.auth is None:
raise falcon.HTTPUnauthorized(title="Unauthorized")

scheme, token = req.auth.split(' ')
if scheme != 'Basic':
raise falcon.HTTPUnauthorized(title="Unauthorized")

try:
username, password = b64decode(token).decode('utf-8').split(':')

if username is None or password is None:
raise falcon.HTTPUnauthorized(title="Unauthorized")
username, password = self.parseBasicAuth(req)

if username == self.username and password == self.password:
return
Expand Down
4 changes: 2 additions & 2 deletions src/keria/testing/testing_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ def server(agency, httpPort=3902):
return httpServerDoer

@staticmethod
def controller():
serder, signers = Helpers.incept(bran=b'0123456789abcdefghijk', stem="signify:controller", pidx=0)
def controller(bran=b'0123456789abcdefghijk'):
serder, signers = Helpers.incept(bran=bran, stem="signify:controller", pidx=0)
sigers = [signers[0].sign(ser=serder.raw, index=0)]
return serder, sigers

Expand Down
73 changes: 40 additions & 33 deletions tests/app/test_agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,52 +283,59 @@ def test_unprotected_boot_ends(helpers):
}

def test_protected_boot_ends(helpers):
agency = agenting.Agency(name="agency", bran=None, temp=True)
doist = doing.Doist(limit=1.0, tock=0.03125, real=True)
doist.enter(doers=[agency])
credentials = [
dict(bran=b'0123456789abcdefghija', username="user", password="secret"),
dict(bran=b'0123456789abcdefghijb', username="admin", password="secret with spaces"),
dict(bran=b'0123456789abcdefghijc', username="admin", password="secret : with colon")
]

serder, sigers = helpers.controller()
assert serder.pre == helpers.controllerAID
for credential in credentials:
bran = credential["bran"]
username = credential["username"]
password = credential["password"]

app = falcon.App()
client = testing.TestClient(app)
agency = agenting.Agency(name="agency", bran=None, temp=True)
doist = doing.Doist(limit=1.0, tock=0.03125, real=True)
doist.enter(doers=[agency])

username = "user"
password = "secret"
serder, sigers = helpers.controller(bran=bran)

bootEnd = agenting.BootEnd(agency, username=username, password=password)
app.add_route("/boot", bootEnd)
app = falcon.App()
client = testing.TestClient(app)

body = dict(
icp=serder.ked,
sig=sigers[0].qb64,
salty=dict(
stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ',
icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed]
bootEnd = agenting.BootEnd(agency, username=username, password=password)
app.add_route("/boot", bootEnd)

body = dict(
icp=serder.ked,
sig=sigers[0].qb64,
salty=dict(
stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ',
icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed]
)
)
)

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"))
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"))
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"})
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"})
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Basic user:secret"})
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {username}:{password}"})
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'test:secret').decode('utf-8')}"} )
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'test:{password}', 'utf-8')).decode('utf-8')}"} )
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user').decode('utf-8')}"} )
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'{username}', 'utf-8')).decode('utf-8')}"} )
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user:test').decode('utf-8')}"} )
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'{username}:test', 'utf-8')).decode('utf-8')}"} )
assert rep.status_code == 401

authorization = f"Basic {b64encode(b'user:secret').decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 202
authorization = f"Basic {b64encode(bytes(f'{username}:{password}', 'utf-8')).decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 202

def test_misconfigured_protected_boot_ends(helpers):
agency = agenting.Agency(name="agency", bran=None, temp=True)
Expand Down
Loading