-
-
Notifications
You must be signed in to change notification settings - Fork 51
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement AtprotoData and DID key formatting and parsing
- Loading branch information
Showing
13 changed files
with
424 additions
and
122 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
P256_DID_PREFIX = b'\x80\x24' | ||
SECP256K1_DID_PREFIX = b'\xe7\x01' | ||
|
||
BASE58_MULTIBASE_PREFIX = 'z' | ||
DID_KEY_PREFIX = 'did:key:' | ||
|
||
P256_JWT_ALG = 'ES256' | ||
SECP256K1_JWT_ALG = 'ES256K' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from dataclasses import dataclass | ||
|
||
from atproto_crypto.consts import ( | ||
BASE58_MULTIBASE_PREFIX, | ||
DID_KEY_PREFIX, | ||
P256_DID_PREFIX, | ||
P256_JWT_ALG, | ||
SECP256K1_DID_PREFIX, | ||
SECP256K1_JWT_ALG, | ||
) | ||
from atproto_crypto.multibase import bytes_to_multibase, multibase_to_bytes | ||
from atproto_crypto.p256.encoding import compress_public_key as p256_compress_public_key | ||
from atproto_crypto.p256.encoding import decompress_public_key as p256_decompress_public_key | ||
from atproto_crypto.secp256k1.encoding import compress_public_key as secp256k1_compress_public_key | ||
from atproto_crypto.secp256k1.encoding import decompress_public_key as secp256k1_decompress_public_key | ||
|
||
|
||
@dataclass | ||
class Multikey: | ||
jwt_alg: str | ||
key_bytes: bytes | ||
|
||
|
||
def parse_multikey(multikey: str) -> Multikey: | ||
if not multikey.startswith(BASE58_MULTIBASE_PREFIX): | ||
raise ValueError(f'Incorrect prefix for multikey {multikey}') | ||
|
||
prefixed_bytes = multibase_to_bytes(multikey) | ||
|
||
if prefixed_bytes.startswith(P256_DID_PREFIX): | ||
jwt_alg = P256_JWT_ALG | ||
compressed_key_bytes = prefixed_bytes[len(P256_DID_PREFIX) :] | ||
key_bytes = p256_decompress_public_key(compressed_key_bytes) | ||
elif prefixed_bytes.startswith(SECP256K1_DID_PREFIX): | ||
jwt_alg = SECP256K1_JWT_ALG | ||
compressed_key_bytes = prefixed_bytes[len(SECP256K1_DID_PREFIX) :] | ||
key_bytes = secp256k1_decompress_public_key(compressed_key_bytes) | ||
else: | ||
raise ValueError('Unsupported key type') | ||
|
||
return Multikey(jwt_alg, key_bytes) | ||
|
||
|
||
def format_multikey(jwt_alg: str, key_bytes: bytes) -> str: | ||
if jwt_alg == P256_JWT_ALG: | ||
prefix = P256_DID_PREFIX | ||
compressed_key_bytes = p256_compress_public_key(key_bytes) | ||
elif jwt_alg == SECP256K1_JWT_ALG: | ||
prefix = SECP256K1_DID_PREFIX | ||
compressed_key_bytes = secp256k1_compress_public_key(key_bytes) | ||
else: | ||
raise ValueError('Unsupported key type') | ||
|
||
prefixed_bytes = prefix + compressed_key_bytes | ||
return bytes_to_multibase(BASE58_MULTIBASE_PREFIX, prefixed_bytes) | ||
|
||
|
||
def format_did_key(jwt_alg: str, key_bytes: bytes) -> str: | ||
return f'{DID_KEY_PREFIX}{format_multikey(jwt_alg, key_bytes)}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import libipld | ||
|
||
|
||
def multibase_to_bytes(data: str) -> bytes: | ||
_, data = libipld.decode_multibase(data) | ||
return data | ||
|
||
|
||
def bytes_to_multibase(encoding: str, data: bytes) -> str: | ||
return libipld.encode_multibase(encoding, data) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from cryptography.hazmat.primitives import serialization | ||
from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1, EllipticCurvePublicKey | ||
|
||
|
||
def compress_public_key(pubkey: bytes) -> bytes: | ||
public_key = EllipticCurvePublicKey.from_encoded_point(SECP256R1(), pubkey) | ||
return public_key.public_bytes( | ||
encoding=serialization.Encoding.X962, format=serialization.PublicFormat.CompressedPoint | ||
) | ||
|
||
|
||
def decompress_public_key(pubkey: bytes) -> bytes: | ||
if len(pubkey) != 33: | ||
raise ValueError('Expected 33 byte compress pubkey') | ||
|
||
public_key = EllipticCurvePublicKey.from_encoded_point(SECP256R1(), pubkey) | ||
return public_key.public_bytes( | ||
encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint | ||
) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from cryptography.hazmat.primitives import serialization | ||
from cryptography.hazmat.primitives.asymmetric.ec import SECP256K1, EllipticCurvePublicKey | ||
|
||
|
||
def compress_public_key(pubkey: bytes) -> bytes: | ||
public_key = EllipticCurvePublicKey.from_encoded_point(SECP256K1(), pubkey) | ||
return public_key.public_bytes( | ||
encoding=serialization.Encoding.X962, format=serialization.PublicFormat.CompressedPoint | ||
) | ||
|
||
|
||
def decompress_public_key(pubkey: bytes) -> bytes: | ||
if len(pubkey) != 33: | ||
raise ValueError('Expected 33 byte compress pubkey') | ||
|
||
public_key = EllipticCurvePublicKey.from_encoded_point(SECP256K1(), pubkey) | ||
return public_key.public_bytes( | ||
encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,73 @@ | ||
import typing as t | ||
from dataclasses import dataclass | ||
|
||
from atproto_identity.did.models import AtprotoData | ||
from atproto_crypto.consts import P256_JWT_ALG, SECP256K1_JWT_ALG | ||
from atproto_crypto.did import format_did_key, parse_multikey | ||
from atproto_crypto.multibase import multibase_to_bytes | ||
|
||
if t.TYPE_CHECKING: | ||
from atproto_core.did_doc import DidDocument | ||
|
||
|
||
def ensure_atproto_document(_: 'DidDocument') -> AtprotoData: | ||
raise NotImplementedError | ||
@dataclass | ||
class AtprotoData: | ||
did: str | ||
signing_key: str | ||
handle: str | ||
pds: str | ||
|
||
@classmethod | ||
def from_did_doc(cls, did_doc: 'DidDocument') -> 'AtprotoData': | ||
return parse_to_atproto_data(did_doc) | ||
|
||
def ensure_atproto_key(_: 'DidDocument') -> str: | ||
raise NotImplementedError | ||
|
||
def get_did_key(did_doc: 'DidDocument') -> t.Optional[str]: | ||
key = did_doc.get_signing_key() | ||
if key is None: | ||
return None | ||
|
||
key_bytes = multibase_to_bytes(key.public_key_multibase) | ||
|
||
did_key = None | ||
if key.type == 'EcdsaSecp256r1VerificationKey2019': | ||
did_key = format_did_key(P256_JWT_ALG, key_bytes) | ||
elif key.type == 'EcdsaSecp256k1VerificationKey2019': | ||
did_key = format_did_key(SECP256K1_JWT_ALG, key_bytes) | ||
elif key.type == 'Multikey': | ||
parsed_key = parse_multikey(key.public_key_multibase) | ||
did_key = format_did_key(parsed_key.jwt_alg, parsed_key.key_bytes) | ||
|
||
return did_key | ||
|
||
|
||
def parse_to_atproto_data(did_doc: 'DidDocument') -> AtprotoData: | ||
return AtprotoData( | ||
did=did_doc.id, | ||
signing_key=get_did_key(did_doc), | ||
handle=did_doc.get_handle(), | ||
pds=did_doc.get_pds_endpoint(), | ||
) | ||
|
||
|
||
def ensure_atproto_document(did_doc: 'DidDocument') -> AtprotoData: | ||
atproto_data = AtprotoData.from_did_doc(did_doc) | ||
|
||
if atproto_data.did is None: | ||
raise ValueError(f'Could not parse did from doc: {did_doc}') | ||
if atproto_data.signing_key is None: | ||
raise ValueError(f'Could not parse signingKey from doc: {did_doc}') | ||
if atproto_data.handle is None: | ||
raise ValueError(f'Could not parse handle from doc: {did_doc}') | ||
if atproto_data.pds is None: | ||
raise ValueError(f'Could not parse pds from doc: {did_doc}') | ||
|
||
return atproto_data | ||
|
||
|
||
def ensure_atproto_key(did_doc: 'DidDocument') -> str: | ||
atproto_data = AtprotoData.from_did_doc(did_doc) | ||
|
||
if atproto_data.signing_key is None: | ||
raise ValueError(f'Could not parse signingKey from doc: {did_doc}') | ||
|
||
return atproto_data.signing_key |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.