Skip to content

Commit

Permalink
Merge pull request #56 from decentralized-identity/tdw-init
Browse files Browse the repository at this point in the history
Initial did log creation
  • Loading branch information
PatStLouis authored Oct 21, 2024
2 parents f8356db + e431fd7 commit 7814dad
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 176 deletions.
3 changes: 2 additions & 1 deletion server/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import FastAPI, APIRouter
from fastapi.responses import JSONResponse
from app.routers import identifiers
from app.routers import identifiers, resolvers
from config import settings

app = FastAPI(title=settings.PROJECT_TITLE, version=settings.PROJECT_VERSION)
Expand All @@ -14,5 +14,6 @@ async def server_status():


api_router.include_router(identifiers.router)
api_router.include_router(resolvers.router)

app.include_router(api_router)
12 changes: 9 additions & 3 deletions server/app/models/di_proof.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ def model_dump(self, **kwargs) -> Dict[str, Any]:
class DataIntegrityProof(BaseModel):
type: str = Field("DataIntegrityProof")
cryptosuite: str = Field("eddsa-jcs-2022")
verification_method: str = Field(alias="verificationMethod")
proof_value: str = Field(alias="proofValue")
proof_purpose: str = Field(alias="proofPurpose")
proofValue: str = Field()
proofPurpose: str = Field("assertionMethod")
verificationMethod: str = Field()
domain: str = Field(None)
challenge: str = Field(None)
created: str = Field(None)
Expand All @@ -30,6 +30,12 @@ def validate_cryptosuite(cls, value):
assert value in ["eddsa-jcs-2022"]
return value

@field_validator("proofPurpose")
@classmethod
def validate_proof_purpose(cls, value):
assert value in ["assertionMethod", "authentication"]
return value

@field_validator("expires")
@classmethod
def validate_expires(cls, value):
Expand Down
6 changes: 2 additions & 4 deletions server/app/models/did_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ def service_endpoint_validator(cls, value):

class DidDocument(BaseModel):
context: Union[str, List[str]] = Field(
[
"https://www.w3.org/ns/did/v1",
"https://w3id.org/security/multikey/v1"
], alias="@context"
["https://www.w3.org/ns/did/v1", "https://w3id.org/security/multikey/v1"],
alias="@context",
)
id: str = Field()
name: str = Field(None)
Expand Down
48 changes: 48 additions & 0 deletions server/app/models/did_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Union, List, Dict, Any
from pydantic import BaseModel, Field
from .did_document import DidDocument
from .di_proof import DataIntegrityProof


class BaseModel(BaseModel):
def model_dump(self, **kwargs) -> Dict[str, Any]:
return super().model_dump(by_alias=True, exclude_none=True, **kwargs)


class Witness(BaseModel):
id: str = Field(None)
weight: int = Field(None)


class WitnessParam(BaseModel):
threshold: int = Field(None)
selfWeight: int = Field(None)
witnesses: List[Witness] = Field(None)


class LogParameters(BaseModel):
prerotation: bool = Field(None)
portable: bool = Field(None)
updateKeys: List[str] = Field(None)
nextKeyHashes: List[str] = Field(None)
witness: WitnessParam = Field(None)
deactivated: bool = Field(None)
ttl: bool = Field(None)
method: str = Field(None)
scid: str = Field(None)


class InitialLogEntry(BaseModel):
versionId: str = Field()
versionTime: str = Field()
parameters: LogParameters = Field()
state: dict = Field()
proof: Union[DataIntegrityProof, List[DataIntegrityProof]] = Field(None)


class LogEntry(BaseModel):
versionId: str = Field()
versionTime: str = Field()
parameters: LogParameters = Field()
state: DidDocument = Field()
proof: Union[DataIntegrityProof, List[DataIntegrityProof]] = Field(None)
5 changes: 5 additions & 0 deletions server/app/models/web_schemas.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Any
from pydantic import BaseModel, Field
from .did_document import SecuredDidDocument
from .did_log import InitialLogEntry


class BaseModel(BaseModel):
Expand All @@ -10,3 +11,7 @@ def model_dump(self, **kwargs) -> Dict[str, Any]:

class RegisterDID(BaseModel):
didDocument: SecuredDidDocument = Field()


class RegisterInitialLogEntry(BaseModel):
logEntry: InitialLogEntry = Field()
6 changes: 1 addition & 5 deletions server/app/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from .askar import AskarStorage, AskarVerifier
from .trust_did_web import TrustDidWeb

__all__ = [
"AskarVerifier",
"AskarStorage",
"TrustDidWeb"
]
__all__ = ["AskarVerifier", "AskarStorage", "TrustDidWeb"]
41 changes: 19 additions & 22 deletions server/app/plugins/askar.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AskarVerifier:
def __init__(self):
self.type = "DataIntegrityProof"
self.cryptosuite = "eddsa-jcs-2022"
self.purpose = "authentication"
self.purpose = "assertionMethod"

def create_proof_config(self, did):
expires = str(
Expand All @@ -66,34 +66,31 @@ def create_proof_config(self, did):
"type": self.type,
"cryptosuite": self.cryptosuite,
"proofPurpose": self.purpose,
"verificationMethod": f"did:key:{settings.ENDORSER_MULTIKEY}#{settings.ENDORSER_MULTIKEY}",
"expires": expires,
"domain": settings.DOMAIN,
"challenge": self.create_challenge(did + expires),
}
if not challenge:
expires = str(
(datetime.now(timezone.utc) + timedelta(minutes=10)).isoformat(
"T", "seconds"
)
)
proof_options['expires'] = expires
proof_options['challenge'] = self.create_challenge(created + expires)

return proof_options

def create_challenge(self, value):
return str(uuid.uuid5(uuid.NAMESPACE_DNS, settings.SECRET_KEY + value))

def assert_proof_options(self, proof, did):
def validate_challenge(self, proof, did):
try:
assert datetime.fromisoformat(proof["expires"]) > datetime.now(
timezone.utc
), "Proof expired."
assert proof["domain"] == settings.DOMAIN, "Domain mismatch."
assert proof["challenge"] == self.create_challenge(
did + proof["expires"]
), "Challenge mismatch."
if proof.get("domain"):
assert proof["domain"] == settings.DOMAIN, "Domain mismatch."
if proof.get("challenge"):
assert proof["challenge"] == self.create_challenge(
did + proof["expires"]
), "Challenge mismatch."
except AssertionError as msg:
raise HTTPException(status_code=400, detail=str(msg))

def validate_proof(self, proof):
try:
if proof.get("expires"):
assert datetime.fromisoformat(proof["expires"]) > datetime.now(
timezone.utc
), "Proof expired."
assert proof["type"] == self.type, f"Expected {self.type} proof type."
assert (
proof["cryptosuite"] == self.cryptosuite
Expand All @@ -105,8 +102,8 @@ def assert_proof_options(self, proof, did):
raise HTTPException(status_code=400, detail=str(msg))

def verify_proof(self, document, proof):
self.assert_proof_options(proof, document["id"])

self.validate_proof(proof)
multikey = proof["verificationMethod"].split("#")[-1]

key = Key(LocalKeyHandle()).from_public_bytes(
Expand Down
123 changes: 35 additions & 88 deletions server/app/plugins/trust_did_web.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,50 @@
from config import settings
from datetime import datetime, timezone
from datetime import datetime
from app.models.did_log import LogParameters, InitialLogEntry
import canonicaljson
import json
from multiformats import multihash, multibase


class TrustDidWeb:
def __init__(self):
self.did_string_base = r'did:tdw:{SCID}:'+settings.DOMAIN

def _define_parameters(self, update_key=None, next_key=None, ttl=100):
self.method_version = "did:tdw:0.4"
self.did_string_base = r"did:tdw:{SCID}:" + settings.DOMAIN

def _init_parameters(self, update_key, next_key=None, ttl=100):
# https://identity.foundation/trustdidweb/#generate-scid
parameters = {
"method": 'did:tdw:0.3',
"scid": r"{SCID}",
"updateKeys": [update_key],
"portable": False,
"prerotation": False,
"nextKeyHashes": [],
# "witness": {},
"deactivated": False,
"ttl": ttl,
}
parameters = LogParameters(
method=self.method_version, scid=r"{SCID}", updateKeys=[update_key]
)
return parameters

def _generate_entry_hash(self, log_entry):
# https://identity.foundation/trustdidweb/#generate-entry-hash
jcs = canonicaljson.encode_canonical_json(log_entry)
multihashed = multihash.digest(jcs, 'sha2-256')
encoded = multibase.encode(multihashed, 'base58btc')[1:]
return encoded


def _init_state(self, did_doc):
return json.loads(json.dumps(did_doc).replace("did:web:", r"did:tdw:{SCID}:"))

def _generate_scid(self, log_entry):
# https://identity.foundation/trustdidweb/#generate-scid
jcs = canonicaljson.encode_canonical_json(log_entry)
multihashed = multihash.digest(jcs, 'sha2-256')
encoded = multibase.encode(multihashed, 'base58btc')[1:]
multihashed = multihash.digest(jcs, "sha2-256")
encoded = multibase.encode(multihashed, "base58btc")[1:]
return encoded

def _add_placeholder_scid(self, item):
if isinstance(item, str):
return item.replace('did:web:', r'did:tdw:{SCID}:')
elif isinstance(item, list):
item['id'].replace('did:web:', r'did:tdw:{SCID}:')
return item
else:
pass

def _web_to_tdw(self, did_doc):
did_doc['id'] = self._add_placeholder_scid(did_doc['id'])
for idx, item in enumerate(did_doc['verificationMethod']):
did_doc['verificationMethod'][idx] = self._add_placeholder_scid(did_doc['verificationMethod'][idx])

def _init_parameters(self, update_key):
return {
"method": 'did:tdw:0.3',
"scid": r"{SCID}",
"updateKeys": [update_key],
"portable": False,
"prerotation": False,
"nextKeyHashes": [],
"deactivated": False,
}

def _init_did_doc(self):
return {
"@context": [],
"id": r"{SCID}",
}

def provision_log_entry(self, did_doc, update_key):
did_doc = json.loads(json.dumps(did_doc).replace('did:web:', r'did:tdw:{SCID}:'))
preliminary_did_log_entry = [
r'{SCID}',
str(datetime.now(timezone.utc).isoformat("T", "seconds")),
self._init_parameters(update_key=update_key),
{
"value": did_doc
}
]
scid = self._generate_scid(preliminary_did_log_entry)
log_entry = json.loads(json.dumps(preliminary_did_log_entry).replace('{SCID}', scid))
entry_hash = self._generate_entry_hash(log_entry)
log_entry[0] = f'1-{entry_hash}'
return log_entry

def create(self, did_doc):

def _generate_entry_hash(self, log_entry):
# https://identity.foundation/trustdidweb/#generate-entry-hash
jcs = canonicaljson.encode_canonical_json(log_entry)
multihashed = multihash.digest(jcs, "sha2-256")
encoded = multibase.encode(multihashed, "base58btc")[1:]
return encoded

def create(self, did_doc, update_key):
# https://identity.foundation/trustdidweb/#create-register
did_string = did_doc['id'].replace('did:web:', r'did:tdw:{SCID}:')
authorized_keys = []
initial_did_doc = self._web_to_tdw(did_doc)
parameters = self._define_parameters(update_key=did_doc['verificaitonMethod'][0]['publicKeyMultibase'])
did_log_entry = [
r'{SCID}',
str(datetime.now().isoformat('T', 'seconds')),
parameters,
{"value": initial_did_doc}
]
scid = self._generate_scid(did_log_entry)
did_log_entry = json.loads(json.dumps(did_log_entry).replace('{SCID}', scid))
log_entry_hash = self._generate_entry_hash(did_log_entry)
did_log_entry[0] = f'1-{log_entry_hash}'
log_entry = InitialLogEntry(
versionId=r"{SCID}",
versionTime=str(datetime.now().isoformat("T", "seconds")),
parameters=self._init_parameters(update_key=update_key),
state=self._init_state(did_doc),
).model_dump()
scid = self._generate_scid(log_entry)
log_entry = json.loads(json.dumps(log_entry).replace("{SCID}", scid))
log_entry_hash = self._generate_entry_hash(log_entry)
log_entry["versionId"] = f"1-{log_entry_hash}"
return log_entry
Loading

0 comments on commit 7814dad

Please sign in to comment.