Changeset ef964d8 in flexoentity
- Timestamp:
- 11/27/25 18:12:23 (7 weeks ago)
- Branches:
- master
- Children:
- 4e11d58
- Parents:
- 9a50e0b
- Files:
-
- 9 edited
-
flexoentity/domain.py (modified) (3 diffs)
-
flexoentity/flexo_entity.py (modified) (13 diffs)
-
flexoentity/flexo_signature.py (modified) (3 diffs)
-
flexoentity/id_factory.py (modified) (3 diffs)
-
tests/conftest.py (modified) (4 diffs)
-
tests/test_flexoid.py (modified) (1 diff)
-
tests/test_id_lifecycle.py (modified) (6 diffs)
-
tests/test_id_stress.py (modified) (3 diffs)
-
tests/test_persistance_integrity.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
flexoentity/domain.py
r9a50e0b ref964d8 1 from uuid import UUID2 1 from dataclasses import dataclass 3 from flexoentity import FlexOID, FlexoEntity, EntityType 2 from flexoentity import FlexoEntity, EntityType 3 4 4 5 5 @dataclass … … 26 26 return self.domain_id 27 27 28 def _deserialize_content(self, content: dict): 29 self.fullname = content.get("fullname", "") 30 self.description = content.get("description", "") 31 self.classification = content.get("classification", "UNCLASSIFIED") 32 28 33 def _serialize_content(self): 29 34 return { … … 32 37 "classification": self.classification, 33 38 } 34 39 35 40 @classmethod 36 41 def from_dict(cls, data): 37 # Must have flexo_id38 if "flexo_id" not in data:39 raise ValueError("Domain serialization missing 'flexo_id'.")40 42 41 flexo_id = FlexOID(data["flexo_id"]) 42 43 obj = cls( 44 fullname=data.get("fullname", ""), 45 description=data.get("description", ""), 46 classification=data.get("classification", "UNCLASSIFIED"), 47 flexo_id=flexo_id, 48 _in_factory=True 49 ) 50 51 # Restore metadata 52 obj.origin = data.get("origin") 53 obj.fingerprint = data.get("fingerprint", "") 54 obj.originator_id = ( 55 UUID(data["originator_id"]) if data.get("originator_id") else None 56 ) 57 obj.owner_id = ( 58 UUID(data["owner_id"]) if data.get("owner_id") else None 59 ) 60 43 obj = super().from_dict(data) 44 content = data.get("content", {}) 45 obj._deserialize_content(content) 61 46 return obj -
flexoentity/flexo_entity.py
r9a50e0b ref964d8 60 60 61 61 62 from flexoentity.id_factory import FlexOID 63 from flexoentity import canonical_seed62 from flexoentity.id_factory import FlexOID, canonical_seed 63 from flexoentity.logger import logger 64 64 65 65 … … 170 170 return copy 171 171 172 @staticmethod 173 def canonicalize_content_dict(data) -> str: 174 """ 175 Canonicalize structured content for fingerprinting. 176 - Stable JSON representation 177 - Sorted keys 178 - No whitespace noise 179 """ 180 return json.dumps(data, sort_keys=True, separators=(",", ":")) 181 172 182 @property 173 183 @abstractmethod … … 190 200 """I return the entity type derived from my FlexOID""" 191 201 return EntityType(self.flexo_id.entity_type) 192 193 def canonical_seed(self) -> str:194 """195 I use a helper method to flatten my text_seed.196 NOTE:This might become superfluous when text_seed provides this by itself. Redesign possible197 """198 return canonical_seed(self.text_seed)199 202 200 203 @classmethod … … 241 244 if not self.subtype: 242 245 self.subtype = self.__class__.__name__ 243 246 244 247 def __str__(self): 245 248 return ( … … 258 261 } 259 262 260 @abstractmethod261 def _serialize_content(self):262 return {}263 264 263 def to_dict(self): 265 264 return { … … 276 275 - ALWAYS restores the canonical Domain via DomainManager. 277 276 """ 278 if "flexo_id" not in data["meta"]: 279 raise ValueError("Serialized entity must include 'flexo_id'.") 280 281 flexo_id = FlexOID(data["meta"]["flexo_id"]) 282 subtype = data["subtype"] 283 # canonical domain object 277 meta = data.get("meta", "") 278 # if not meta or meta.get("flexo_id", ""): 279 # raise ValueError("Serialized entity must include 'flexo_id'.") 280 flexo_id = FlexOID(meta.get("flexo_id")) 281 subtype = meta.get("subtype") 282 if not subtype: 283 raise ValueError("Serialized entity must include 'subtype'.") 284 285 try: 286 owner_id = UUID(meta.get("owner_id")) 287 except ValueError as e: 288 logger.warn(f"Missing or wrong owner_id {e}") 289 owner_id = UUID(int=0) 290 try: 291 originator_id = UUID(meta.get("originator_id")) 292 except ValueError as e: 293 logger.warn(f"Missing or wrong originator_id {e}") 294 originator_id = UUID(int=0) 284 295 285 296 obj = cls( 286 297 flexo_id=flexo_id, 287 298 subtype=subtype, 299 origin=meta.get("origin", ""), 300 fingerprint=meta.get("fingerprint", ""), 301 owner_id=owner_id, 302 originator_id=originator_id, 288 303 _in_factory=True 289 304 ) 290 291 # restore provenance 292 obj.fingerprint = data.get("fingerprint", "") 293 obj.origin = data.get("origin") 294 295 if data.get("originator_id"): 296 obj.originator_id = UUID(data["originator_id"]) 297 298 if data.get("owner_id"): 299 obj.owner_id = UUID(data["owner_id"]) 300 305 obj._deserialize_content(data) 301 306 return obj 302 307 … … 311 316 return cls.from_dict(data) 312 317 318 def _deserialize_content(self, content_dict): 319 """Subclasses override this to restore real fields.""" 320 pass # default: do nothing 321 322 323 def _serialize_content(self): 324 """ 325 Subclasses override to return all integrity-relevant fields. 326 Should include: 327 - all actual structured data 328 - comment (if meaningful) 329 - signature_data / certificate info (if present) 330 """ 331 return {} 332 333 def canonical_content(self) -> str: 334 """Get canonical JSON string of content for fingerprinting.""" 335 return self.canonicalize_content_dict(self._serialize_content()) 336 313 337 def _compute_fingerprint(self) -> str: 314 338 """I recompute the entity's content fingerprint.""" 315 seed = self.canonical_seed() 316 return hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper() 339 return hashlib.blake2s(self.canonical_content().encode("utf-8"), digest_size=8).hexdigest().upper() 317 340 318 341 def _update_fingerprint(self) -> bool: 319 """320 I update FlexOID if the content fingerprint changed.321 """322 342 new_fp = self._compute_fingerprint() 323 if new_fp != self.fingerprint: 324 self.fingerprint = new_fp 325 self.flexo_id = FlexOID.safe_generate(self.domain_id, 326 self.entity_type.value, 327 self.state.value, 328 self.text_seed, 329 self.flexo_id.version) 330 return True 331 return False 332 333 def _transition(self, target_state: EntityState): 334 """ 335 I am an internal helper for state transitions with version/fingerprint checks 336 and forward-only enforcement. 337 """ 338 339 if target_state not in self.allowed_transitions(): 340 raise ValueError( 341 f"Illegal state transition: {self.state.name} → {target_state.name}. " 342 ) 343 344 # special case: marking obsolete 345 if target_state == EntityState.OBSOLETE: 346 self.flexo_id = self.flexo_id.with_state(EntityState.OBSOLETE.value) 347 return 348 349 self.state = target_state 343 changed = new_fp != self.fingerprint 344 self.fingerprint = new_fp 345 return changed 350 346 351 347 # ─────────────────────────────────────────────────────────────── … … 380 376 """ 381 377 if self.state == EntityState.DRAFT: 382 new_fid = FlexOID.safe_generate( self.domain_id,383 self.entity_type.value,384 EntityState.APPROVED.value,385 self.text_seed,378 new_fid = FlexOID.safe_generate(domain_id=self.domain_id, 379 entity_type=self.entity_type.value, 380 state=EntityState.APPROVED.value, 381 text=self.text_seed, 386 382 version=self.version 387 383 ) … … 401 397 402 398 # Change only the trailing state letter (A → S) 399 old_id = self.flexo_id 403 400 self.flexo_id = self.flexo_id.with_state(EntityState.APPROVED_AND_SIGNED.value) 401 self.flexo_id.origin = old_id 404 402 return self 405 403 … … 417 415 418 416 new_fid = FlexOID.safe_generate( 419 self.domain_id,420 self.entity_type.value,421 EntityState.PUBLISHED.value,422 self.text_seed,417 domain_id=self.domain_id, 418 entity_type=self.entity_type.value, 419 state=EntityState.PUBLISHED.value, 420 text=self.text_seed, 423 421 version=self.version 424 422 ) … … 435 433 ) 436 434 if self.state != EntityState.OBSOLETE: 437 self._transition(EntityState.OBSOLETE) 435 new_fid = FlexOID.safe_generate( 436 domain_id=self.domain_id, 437 entity_type=self.entity_type.value, 438 state=EntityState.OBSOLETE.value, 439 text=self.text_seed, 440 version=self.version 441 ) 442 443 self.origin = self.flexo_id 444 self.flexo_id = new_fid 445 438 446 return self 439 447 … … 442 450 self.origin = str(self.flexo_id) 443 451 self.flexo_id = FlexOID.clone_new_base( 444 self.domain_id,445 self.entity_type.value,446 EntityState.DRAFT.value,447 self.text_seed,452 domain_id=self.domain_id, 453 entity_type=self.entity_type.value, 454 state=EntityState.DRAFT.value, 455 text=self.text_seed, 448 456 ) 449 457 return self … … 459 467 Returns False if: 460 468 - flexo_id format is invalid 461 - derived state/type mismatch the ID462 - fingerprint doesn't match the canonical text seed463 469 """ 464 470 try: 465 471 if not isinstance(entity.flexo_id, FlexOID): 466 return False467 468 # Validate domain and ID coherence469 if entity.domain_id != entity.flexo_id.domain_id:470 return False471 if entity.entity_type.value != entity.flexo_id.entity_type:472 return False473 if entity.state.value != entity.flexo_id.state_code:474 472 return False 475 473 -
flexoentity/flexo_signature.py
r9a50e0b ref964d8 4 4 from uuid import UUID 5 5 from flexoentity import FlexoEntity, FlexOID, EntityType 6 6 7 7 8 @dataclass … … 46 47 public_cert_path: Optional[str] = None 47 48 49 def to_dict(self): 50 return { 51 "platform": self.platform, 52 "identifier": self.identifier, 53 "private_key_path": self.private_key_path, 54 "public_cert_path": self.public_cert_path 55 } 56 57 48 58 @dataclass 49 59 class FlexoSignature(FlexoEntity): … … 75 85 def _serialize_content(self): 76 86 return { 77 "signed_entity": s elf.signed_entity,87 "signed_entity": str(self.signed_entity), 78 88 "signer_id": str(self.signer_id), 79 89 "signature_data": self.signature_data, 80 "certificate_reference": self.certificate_reference.to_dict(), 81 "certificate_thumbprint": self.certificate_thumbprint 90 "signature_type": self.signature_type, 91 # "certificate_reference": self.certificate_reference.to_dict(), 92 "certificate_thumbprint": self.certificate_thumbprint, 93 "comment": self.comment 82 94 } 83 95 -
flexoentity/id_factory.py
r9a50e0b ref964d8 78 78 from flexoentity import logger 79 79 80 def canonical_seed(obj) -> str: 81 """ 82 I transform *obj* into a deterministic, comparable text form. 83 84 I remove irrelevant formatting differences so that two equal 85 pieces of data always yield the same hash seed. 86 87 Rules: 88 - If *obj* is a string, I normalize whitespace. 89 - If *obj* is a dict, I JSON-encode it with sorted keys. 90 - If *obj* is an object, I recurse on its __dict__. 91 - Otherwise, I coerce *obj* to str(). 92 """ 93 94 if isinstance(obj, str): 95 text = " ".join(obj.split()) 96 return text 97 if isinstance(obj, dict): 98 return json.dumps(obj, sort_keys=True, separators=(",", ":")) 99 if hasattr(obj, "__dict__"): 100 return canonical_seed(obj.__dict__) 101 return str(obj) 102 80 def canonical_seed(text: str) -> str: 81 """Canonicalize identity-only text_seed.""" 82 if not text: 83 return "" 84 # remove control characters 85 s = re.sub(r"[\t\r\n]+", " ", text) 86 # collapse multiple spaces 87 s = re.sub(r"\s+", " ", s) 88 return s.strip() 103 89 104 90 class FlexOID(str): … … 346 332 347 333 @staticmethod 348 def clone_new_base(domain : str, entity_type: str, state: str, text: str) -> "FlexOID":334 def clone_new_base(domain_id: str, entity_type: str, state: str, text: str) -> "FlexOID": 349 335 """ 350 336 I start a completely new lineage (version 1) for a derived entity. … … 352 338 not share version history with its origin. 353 339 """ 354 return FlexOID.safe_generate(domain , entity_type, state, text, version=1)340 return FlexOID.safe_generate(domain_id, entity_type, state, text, version=1) 355 341 356 342 def to_dict(self) -> dict: -
tests/conftest.py
r9a50e0b ref964d8 1 # tests/stubs/single_choice_question.py2 from dataclasses import dataclass, field3 1 import pytest 4 2 import platform 5 3 from pathlib import Path 6 4 from datetime import datetime 7 from typing import List8 from flexoentity import FlexOID, FlexoEntity, EntityType, EntityState, Domain,get_signing_backend, CertificateReference5 from flexoentity import Domain, FlexoSignature 6 from flexoentity import get_signing_backend, CertificateReference 9 7 10 8 … … 18 16 return FixedDate 19 17 20 @dataclass21 class AnswerOption:22 id: str23 text: str24 points: float = 0.025 26 def to_dict(self):27 return {"id": self.id, "text": self.text, "points": self.points}28 29 @classmethod30 def from_dict(cls, data):31 return cls(32 id=data.get("id", ""),33 text=data.get("text", ""),34 points=data.get("points", 0.0)35 )36 37 38 @dataclass39 class SingleChoiceQuestion(FlexoEntity):40 """A minimal stub to test FlexoEntity integration."""41 ENTITY_TYPE = EntityType.ITEM42 43 text: str = ""44 options: List[AnswerOption] = field(default_factory=list)45 46 def __post_init__(self):47 # If no FlexOID yet, generate a draft ID now.48 if not getattr(self, "flexo_id", None):49 self.flexo_id = FlexOID.safe_generate(50 domain_id=self.domain_id,51 entity_type=SingleChoiceQuestion.ENTITY_TYPE.value, # 'I'52 state=EntityState.DRAFT.value, # 'D'53 text=self.text_seed or self.text,54 version=1,55 )56 57 @classmethod58 def default(cls):59 return cls()60 61 def _serialize_content(self):62 return {63 "text": self.text,64 "options": [opt.to_dict() for opt in self.options],65 }66 67 @property68 def text_seed(self) -> str:69 """Include answer options (and points) for deterministic ID generation."""70 71 joined = "|".join(72 f"{opt.text.strip()}:{opt.points}"73 for opt in sorted(self.options, key=lambda o: o.text.strip().lower())74 )75 return f"{self.text}{joined}"76 77 @classmethod78 def from_dict(cls, data):79 obj = cls(text=data.get("text", ""),80 options=[AnswerOption.from_dict(o) for o in data.get("options", [])],81 )82 # restore FlexoEntity core fields83 if "flexo_id" in data:84 obj.flexo_id = FlexOID.to_dict(data["flexo_id"])85 return obj86 18 87 19 @pytest.fixture … … 92 24 description="ALL ABOUT ARITHMETIC IN PYTHON") 93 25 94 @pytest.fixture95 def sample_question(sample_domain):96 q = SingleChoiceQuestion.with_domain_id(domain_id=sample_domain.domain_id,97 text="What is 2 + 2?",98 options=[])99 q._update_fingerprint()100 return q101 26 102 27 SYSTEM = platform.system() … … 181 106 except Exception as e: 182 107 pytest.skip(f"Backend unavailable or misconfigured: {e}") 108 109 @pytest.fixture 110 def sample_signature(sample_domain, cert_ref_linux): 111 return FlexoSignature.with_domain_id(domain_id="SIG", signed_entity=sample_domain, 112 certificate_reference=cert_ref_linux, 113 comment="This is a mock signature") 114 -
tests/test_flexoid.py
r9a50e0b ref964d8 111 111 # ────────────────────────────────────────────── 112 112 113 def test_canonical_seed_for_strings _and_dicts():113 def test_canonical_seed_for_strings(): 114 114 s1 = " Hello world " 115 115 s2 = "Hello world" 116 116 assert canonical_seed(s1) == canonical_seed(s2) 117 118 d1 = {"b": 1, "a": 2}119 d2 = {"a": 2, "b": 1}120 assert canonical_seed(d1) == canonical_seed(d2)121 122 class Dummy:123 def __init__(self):124 self.x = 1125 self.y = 2126 obj = Dummy()127 assert isinstance(canonical_seed(obj), str)128 129 117 130 118 # ────────────────────────────────────────────── -
tests/test_id_lifecycle.py
r9a50e0b ref964d8 3 3 4 4 5 # ────────────────────────────────────────────────────────────────────────────── 6 # Tests adapted to use real SingleChoiceQuestion fixture instead of DummyEntity 7 # ────────────────────────────────────────────────────────────────────────────── 5 def test_initial_state(sample_domain): 6 assert sample_domain.state == EntityState.DRAFT 7 assert sample_domain.flexo_id.version == 1 8 assert FlexoEntity.verify_integrity(sample_domain) 8 9 9 def test_initial_state(sample_question): 10 q = sample_question 11 assert q.state == EntityState.DRAFT 12 assert q.flexo_id.version == 1 13 assert FlexoEntity.verify_integrity(q) 14 15 def test_approval_does_not_bump_version(sample_question): 16 q = sample_question 10 def test_approval_does_not_bump_version(sample_domain): 11 q = sample_domain 17 12 q.approve() 18 13 assert q.state == EntityState.APPROVED 19 14 assert q.flexo_id.version == 1 20 15 21 def test_signing_does_not_bump_version(sample_ question):22 q = sample_ question16 def test_signing_does_not_bump_version(sample_domain): 17 q = sample_domain 23 18 q.approve() 24 19 before = q.flexo_id … … 38 33 39 34 40 def test_publish_does_not_bump_version(sample_ question):41 q = sample_ question35 def test_publish_does_not_bump_version(sample_domain): 36 q = sample_domain 42 37 q.approve() 43 38 q.sign() … … 48 43 49 44 50 def test_modify_content_changes_fingerprint(sample_question): 51 q = sample_question 52 q.text += "Rephrased content" # simulate text change 53 changed = q._update_fingerprint() 45 def test_modify_content_changes_fingerprint(sample_signature): 46 sample_signature.comment += "Rephrased content" # simulate text change 47 changed = sample_signature._update_fingerprint() 54 48 assert changed 55 49 56 50 57 def test_no_version_bump_on_draft_edits(sample_question): 58 q = sample_question 59 q.text = "Minor draft edit" 60 q._update_fingerprint() 61 assert q.flexo_id.version == 1 51 def test_no_version_bump_on_draft_edits(sample_signature): 52 sample_signature.comment = "Minor draft edit" 53 sample_signature._update_fingerprint() 54 assert sample_signature.flexo_id.version == 1 62 55 63 56 64 def test_version_bump_after_edit_and_sign(sample_question): 65 q = sample_question 66 q.approve() 67 v1 = str(q.flexo_id) 68 q.text = "Changed content" 69 q.sign() 70 assert str(q.flexo_id) != v1 57 def test_version_bump_after_edit_and_sign(sample_signature): 58 sample_signature.approve() 59 v1 = str(sample_signature.flexo_id) 60 sample_signature.comment = "Changed comment" 61 sample_signature.sign() 62 assert str(sample_signature.flexo_id) != v1 71 63 72 64 73 def test_integrity_check_passes_and_fails(sample_question): 74 q = sample_question 75 q.approve() 76 assert FlexoEntity.verify_integrity(q) 65 def test_integrity_check_passes_and_fails(sample_signature): 66 sample_signature.approve() 67 assert FlexoEntity.verify_integrity(sample_signature) 77 68 78 69 # simulate tampering 79 q.text = "Tampered text"80 assert not FlexoEntity.verify_integrity( q)70 sample_signature.comment = "Tampered text" 71 assert not FlexoEntity.verify_integrity(sample_signature) 81 72 82 73 83 def test_obsolete_state(sample_ question):84 q = sample_ question74 def test_obsolete_state(sample_domain): 75 q = sample_domain 85 76 q.approve() 86 77 q.sign() … … 90 81 91 82 92 def test_clone_new_base_resets_lineage(sample_ question):93 q = sample_ question83 def test_clone_new_base_resets_lineage(sample_domain): 84 q = sample_domain 94 85 q.approve() 95 86 q.sign() … … 102 93 assert q.flexo_id.version == 1 103 94 104 def test_clone_new_base_sets_origin(sample_ question):105 q = sample_ question95 def test_clone_new_base_sets_origin(sample_domain): 96 q = sample_domain 106 97 q.approve() 107 98 q.sign() … … 115 106 assert q.flexo_id != old_id 116 107 117 def test_mass_version_increments_until_obsolete(sample_ question):118 q = sample_ question108 def test_mass_version_increments_until_obsolete(sample_domain): 109 q = sample_domain 119 110 q.approve() 120 111 for _ in range(FlexOID.MAX_VERSION - 1): -
tests/test_id_stress.py
r9a50e0b ref964d8 9 9 10 10 import pytest 11 12 from flexoentity import FlexOID, EntityType, EntityState 11 from uuid import uuid4 12 from flexoentity import FlexOID, EntityType, EntityState, FlexoSignature 13 13 14 14 logger = logging.getLogger(__name__) … … 66 66 assert id1 == id2 67 67 68 def test_massive_lifecycle_simulation(cert_ref_linux, sample_domain): 69 """ 70 Generate 100 random FlexoSignatures, mutate content, run through lifecycle, 71 and ensure all FlexOIDs are unique and valid. 72 """ 73 entities = [] 68 74 69 def test_massive_lifecycle_simulation(sample_question): 70 """ 71 Generate 100 random SingleChoiceQuestions, simulate multiple edits and state transitions, 72 ensure all final IDs and fingerprints are unique and valid. 73 """ 74 entities = [ 75 copy.deepcopy(sample_question) for _ in range(100) 76 ] 75 for i in range(100): 76 sig = FlexoSignature.with_domain_id( 77 domain_id="SIGTEST", 78 signed_entity=sample_domain.flexo_id, 79 signer_id=uuid4(), 80 certificate_reference=cert_ref_linux, 81 comment=f"Initial signature #{i}" 82 ) 83 entities.append(sig) 77 84 85 # Mutate + lifecycle transitions 78 86 for i, e in enumerate(entities): 79 # random edit80 e. text += f" updated #{i}"87 # CONTENT CHANGE → fingerprint changes → hash → FlexOID.prefix changes 88 e.comment += f" updated-{i}" 81 89 e._update_fingerprint() 82 90 … … 88 96 e.publish() 89 97 90 flexoids = [e.flexo_id for e in entities] 91 assert len(flexoids) == len(set(flexoids)), "Duplicate FlexOIDs after lifecycle simulation" 98 # Check ID uniqueness 99 flexoids = [str(e.flexo_id) for e in entities] 100 assert len(flexoids) == len(set(flexoids)), "Duplicate FlexOIDs detected" -
tests/test_persistance_integrity.py
r9a50e0b ref964d8 6 6 import pytest 7 7 8 from flexoentity import EntityState, EntityType, Domain 8 from flexoentity import Domain 9 9 10 10 11 @pytest.fixture 11 def approved_question(): 12 """Provide a fully approved and published SingleChoiceQuestion for persistence tests.""" 13 q = SingleChoiceQuestion( 14 domain=Domain(domain="GEN", entity_type=EntityType.DOMAIN, state=EntityState.DRAFT), 15 entity_type=None, # SingleChoiceQuestion sets this internally to EntityType.ITEM 16 state=EntityState.DRAFT, 17 text="What is Ohm’s law?", 18 options=[ 19 AnswerOption(id="OP1", text="U = R × I", points=1), 20 AnswerOption(id="OP2", text="U = I / R", points=0), 21 AnswerOption(id="OP3", text="R = U × I", points=0), 22 ], 23 ) 24 q.approve() 25 q.sign() 26 q.publish() 27 return q 12 def approved_domain(sample_domain): 13 """Provide a fully approved and published Domain for persistence tests.""" 14 sample_domain.approve() 15 sample_domain.sign() 16 sample_domain.publish() 17 return sample_domain 28 18 29 19 30 @pytest.mark.skip(reason="FlexOIDs regenerated on import; enable once JSON format is stable") 31 def test_json_roundtrip_preserves_integrity(approved_question): 20 def test_json_roundtrip_preserves_integrity(approved_domain): 32 21 """ 33 22 Export to JSON and reload — ensure fingerprints and signatures remain valid. 34 23 """ 35 json_str = approved_ question.to_json()36 loaded = SingleChoiceQuestion.from_json(json_str)24 json_str = approved_domain.to_json() 25 loaded = Domain.from_json(json_str) 37 26 38 27 # Fingerprint and state should match — integrity must pass 39 assert SingleChoiceQuestion.verify_integrity(loaded)28 assert Domain.verify_integrity(loaded) 40 29 41 30 # Metadata should be preserved exactly 42 assert approved_ question.fingerprint == loaded.fingerprint43 assert approved_ question.flexo_id == loaded.flexo_id44 assert loaded.state == approved_ question.state31 assert approved_domain.fingerprint == loaded.fingerprint 32 assert approved_domain.flexo_id == loaded.flexo_id 33 assert loaded.state == approved_domain.state 45 34 46 @pytest.mark.skip(reason="FlexOIDs regenerated on import; tampering detection not yet implemented") 47 def test_json_tampering_detection(approved_ question):35 36 def test_json_tampering_detection(approved_domain): 48 37 """Tampering with content should invalidate fingerprint verification.""" 49 json_str = approved_ question.to_json()38 json_str = approved_domain.to_json() 50 39 tampered = json.loads(json_str) 51 tampered[" text"] = "Tampered content injection"40 tampered["content"]["fullname"] = "Tampered content injection" 52 41 tampered_json = json.dumps(tampered) 53 42 54 loaded = SingleChoiceQuestion.from_json(tampered_json)55 assert not SingleChoiceQuestion.verify_integrity(loaded)43 loaded = Domain.from_json(tampered_json) 44 assert not Domain.verify_integrity(loaded) 56 45 57 @pytest.mark.skip(reason="FlexOIDs regenerated on import; corruption detection not yet applicable") 58 def test_json_file_corruption(approved_ question, tmp_path):46 47 def test_json_file_corruption(approved_domain, tmp_path): 59 48 """Simulate file corruption — integrity check must fail.""" 60 49 file = tmp_path / "question.json" 61 json_str = approved_question.to_json() 50 json_str = approved_domain.to_json() 51 print("JSON", json_str) 62 52 file.write_text(json_str) 63 53 64 54 # Corrupt the file (simulate accidental byte modification) 65 corrupted = json_str.replace(" Ohm’s", "Omm’s")55 corrupted = json_str.replace("ARITHMETIC", "ARITHM") 66 56 file.write_text(corrupted) 67 57 68 loaded = SingleChoiceQuestion.from_json(file.read_text())69 assert not SingleChoiceQuestion.verify_integrity(loaded)58 loaded = Domain.from_json(file.read_text()) 59 assert not Domain.verify_integrity(loaded)
Note:
See TracChangeset
for help on using the changeset viewer.
